Skip to content

Commit

Permalink
ddl: update TestSimpleExecBackfillJobs
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Feb 8, 2023
1 parent f697f17 commit 4b69f6c
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 43 deletions.
6 changes: 2 additions & 4 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ const (
minGenPhysicalTableTaskBatch = 64
minDistTaskCnt = 64
retrySQLTimes = 10
retrySQLInterval = 300 * time.Millisecond
)

// RetrySQLInterval is export for test.
Expand All @@ -96,9 +95,9 @@ type BackfillJob struct {
JobID int64
EleID int64
EleKey []byte
PhysicalTableID int64
Tp backfillerType
State model.JobState
PhysicalTableID int64
InstanceID string
InstanceLease types.Time
// range info
Expand Down Expand Up @@ -699,8 +698,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, pID int64, kvRanges []kv
if err != nil {
logutil.BgLogger().Info("[ddl] get backfill range task, get reverse key failed", zap.Error(err))
} else {
logutil.BgLogger().Info("[ddl] get backfill range task, change end key",
zap.Int64("pID", pID), zap.Int64("pTbl", phyTbl.GetPhysicalID()), zap.Bool("mergeTIdx", reorgInfo.mergingTmpIdx),
logutil.BgLogger().Info("[ddl] get backfill range task, change end key", zap.Int64("pTbl", phyTbl.GetPhysicalID()),
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
endKey = endK
}
Expand Down
25 changes: 7 additions & 18 deletions ddl/dist_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -97,12 +98,8 @@ func getRunningPhysicalTableMetas(sess *session, sJobCtx *splitJobContext, reorg
} else {
for _, pMeta := range pTblMetas {
phyTblMetas = append(phyTblMetas, pMeta)
if pMeta.PhyTblID > currPID {
currPID = pMeta.PhyTblID
}
if pMeta.ID > currBfJobID {
currBfJobID = pMeta.ID
}
currPID = mathutil.Max(pMeta.PhyTblID, currPID)
currBfJobID = mathutil.Max(pMeta.ID, currBfJobID)
physicalTIDs = append(physicalTIDs, pMeta.PhyTblID)
}
}
Expand All @@ -118,13 +115,11 @@ func (dc *ddlCtx) sendPhysicalTableMetas(reorgInfo *reorgInfo, t table.Table, sJ
var err error
physicalTIDs := make([]int64, 0, distPhysicalTableConcurrency)
defer func() {
logutil.BgLogger().Info("[ddl] send physical table ranges to split finished", zap.Int64("jobID", reorgInfo.Job.ID),
zap.Stringer("ele", reorgInfo.currElement), zap.Int64s("phyTblIDs", physicalTIDs), zap.Error(err))
if err != nil {
logutil.BgLogger().Info("[ddl] send physical table ranges to split failed", zap.Int64("jobID", reorgInfo.Job.ID),
zap.Stringer("ele", reorgInfo.currElement), zap.Int64s("phyTblIDs", physicalTIDs), zap.Error(err))
sJobCtx.cancel()
} else {
logutil.BgLogger().Info("[ddl] send physical table ranges to split finished", zap.Int64("jobID", reorgInfo.Job.ID),
zap.Stringer("ele", reorgInfo.currElement), zap.Int64s("phyTblIDs", physicalTIDs))
close(sJobCtx.phyTblMetaCh)
}
}()
Expand Down Expand Up @@ -181,13 +176,7 @@ func (dc *ddlCtx) sendPhysicalTableMetas(reorgInfo *reorgInfo, t table.Table, sJ

physicalTIDs = append(physicalTIDs, pID)
}
} //else {
// //nolint:forcetypeassert
// phyTbl := t.(table.PhysicalTable)
// bfJM := &BackfillJobRangeMeta{PhyTblID: phyTbl.Meta().ID, PhyTbl: phyTbl, StartKey: reorgInfo.StartKey, EndKey: reorgInfo.EndKey}
// sJobCtx.phyTblMetaCh <- bfJM
// physicalTIDs = append(physicalTIDs, bfJM.PhyTblID)
// }
}
}

func (dc *ddlCtx) controlWriteTableRecord(sessPool *sessionPool, t table.Table, bfWorkerType backfillerType, reorgInfo *reorgInfo) error {
Expand Down Expand Up @@ -468,7 +457,7 @@ func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleI
ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_state")
if err != nil {
logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err))
time.Sleep(retrySQLInterval)
time.Sleep(RetrySQLInterval)
continue
}

Expand Down
6 changes: 2 additions & 4 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,6 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob)
sqlBuilder.WriteString("insert into mysql.")
sqlBuilder.WriteString(tableName)
sqlBuilder.WriteString("(id, ddl_job_id, ele_id, ele_key, ddl_physical_tid, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values")
jobs := ""
for i, bj := range backfillJobs {
mateByte, err := bj.Meta.Encode()
if err != nil {
Expand All @@ -656,7 +655,6 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob)
sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)", bj.ID, bj.JobID, bj.EleID,
wrapKey2String(bj.EleKey), bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey),
wrapKey2String(bj.StartKey), wrapKey2String(bj.EndKey), bj.StartTS, bj.FinishTS, bj.RowCount, wrapKey2String(mateByte)))
jobs += fmt.Sprintf("job:%#v; ", bj.AbbrStr())
}
return sqlBuilder.String(), nil
}
Expand Down Expand Up @@ -743,12 +741,12 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st
}
leaseStr := currTime.Add(-lease).Format(types.TimeFormat)

getJobsSQL := fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_key, ele_id limit %d",
getJobsSQL := fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_id, ele_key limit %d",
leaseStr, jobID, batch)
if pTblID != GetJobWithoutPartition {
if pTblID == 0 {
rows, err := s.execute(context.Background(),
fmt.Sprintf("select ddl_physical_tid from mysql.%s group by ddl_job_id, ele_id, ele_key, ddl_physical_tid having sum(length(exec_id)) = 0 or (max(exec_lease) < '%s' and max(exec_lease) is not null) order by ddl_job_id, ele_key, ele_id, ddl_physical_tid limit 1",
fmt.Sprintf("select ddl_physical_tid from mysql.%s group by ddl_job_id, ele_id, ele_key, ddl_physical_tid having sum(length(exec_id)) = 0 or max(exec_lease) < '%s' order by ddl_job_id, ele_key, ele_id, ddl_physical_tid limit 1",
BackfillTable, leaseStr), "get_mark_backfill_job")
if err != nil {
return errors.Trace(err)
Expand Down
90 changes: 73 additions & 17 deletions ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
eleID1 := int64(11)
eleID2 := int64(22)
eleID3 := int64(33)
noPID := int64(0)
uuid := d.GetID()
eleKey := meta.IndexElementKey
instanceLease := ddl.InstanceLease
Expand All @@ -276,7 +277,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
bJob, err := ddl.GetBackfillJobForOneEle(se, []int64{jobID1, jobID2}, instanceLease)
require.NoError(t, err)
require.Nil(t, bJob)
bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, 1, instanceLease)
bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, noPID, instanceLease)
require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error())
require.Nil(t, bJobs)
allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count")
Expand Down Expand Up @@ -319,7 +320,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
require.NoError(t, err)
})

bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, 1, instanceLease)
bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, noPID, instanceLease)
require.NoError(t, err)
require.Len(t, bJobs, 1)
expectJob = bjTestCases[2]
Expand Down Expand Up @@ -372,54 +373,109 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
// 7 jobID1 eleID1 "" 4
// 8 jobID1 eleID1 "" 3
// 9 jobID1 eleID1 "" 4
checkAndClean := func(batch, jobCnt int, jobID int64) {
simpleCheck := func(batch, jobCnt int, bfJobIDs []int64, pID int64) {
err = ddl.AddBackfillJobs(se, bPhyJobs)
require.NoError(t, err)
bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, batch, jobID1, uuid, 0, instanceLease)
bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, batch, jobID1, uuid, pID, instanceLease)
require.NoError(t, err)
require.Len(t, bJobs, jobCnt)
require.Equal(t, jobID, bJobs[0].ID)
isExist := false
for _, id := range bfJobIDs {
if id == bJobs[0].ID {
isExist = true
}
}
require.True(t, isExist, fmt.Sprintf("expected ids:%v, actual id:%d", bfJobIDs, bJobs[0].ID))
err = ddl.RemoveBackfillJob(se, true, bJobs1[0])
require.NoError(t, err)
}
checkAndClean(3, 2, 2)
bPhyJobs[6].InstanceLease = types.NewTime(types.FromGoTime(time.Now().Add(-time.Hour)), 0, 0)
type cntAndID struct {
batch int
bfJobCnt int
bfJobID []int64
}
checkAndClean := func(expectRet1, expectRet2 cntAndID) {
simpleCheck(expectRet1.batch, expectRet1.bfJobCnt, expectRet1.bfJobID, noPID)
simpleCheck(expectRet2.batch, expectRet2.bfJobCnt, expectRet2.bfJobID, ddl.GetJobWithoutPartition)
}
checkAndClean(cntAndID{3, 3, []int64{0, 1, 3}},
cntAndID{3, 3, []int64{0, 1, 3}})
bPhyJobs[1].InstanceLease = types.NewTime(types.FromGoTime(time.Now().Add(-time.Hour).UTC()), 0, 0)
// ID jobID eleID InstanceID InstanceLease PhysicalTableID
// -----------------------------------------------------------------------
// 0 jobID2 eleID2 "" 1
// 1 jobID2 eleID2 "" 1
// 0 jobID2 eleID3 "" 1
// 1 jobID2 eleID3 "" 1
// 0 jobID1 eleID1 "" 1
// 1 jobID1 eleID1 "uuid_1" 1
// 1 jobID1 eleID1 "uuid_1" currentTime-hour 1
// 2 jobID1 eleID1 "" 2
// 3 jobID1 eleID1 "" 1
// 4 jobID1 eleID1 "" 3
// 5 jobID1 eleID1 "" 3
// 6 jobID1 eleID1 "" currentTime-hour 2
// 6 jobID1 eleID1 "" 2
// 7 jobID1 eleID1 "" 4
// 8 jobID1 eleID1 "" 3
// 9 jobID1 eleID1 "" 4
checkAndClean(3, 2, 2)
bPhyJobs[6].InstanceLease = types.NewTime(types.FromGoTime(time.Now()), 0, 0)
checkAndClean(cntAndID{3, 3, []int64{0, 1, 3}},
cntAndID{3, 3, []int64{0, 1, 3}})
bPhyJobs[3].InstanceLease = types.NewTime(types.FromGoTime(time.Now().UTC()), 0, 0)
// ID jobID eleID InstanceID InstanceLease PhysicalTableID
// -----------------------------------------------------------------------
// 0 jobID2 eleID2 "" 1
// 1 jobID2 eleID2 "" 1
// 0 jobID2 eleID3 "" 1
// 1 jobID2 eleID3 "" 1
// 0 jobID1 eleID1 "" 1
// 1 jobID1 eleID1 "uuid_1" 1
// 2 jobID1 eleID1 "uuid_2" 2
// 3 jobID1 eleID1 "" 1
// 1 jobID1 eleID1 "uuid_1" currentTime-hour 1
// 2 jobID1 eleID1 "" 2
// 3 jobID1 eleID1 "" currentTime 1 // should not exist
// 4 jobID1 eleID1 "" 3
// 5 jobID1 eleID1 "" 3
// 6 jobID1 eleID1 "" 2
// 7 jobID1 eleID1 "" 4
// 8 jobID1 eleID1 "" 3
// 9 jobID1 eleID1 "" 4
checkAndClean(cntAndID{3, 2, []int64{2, 6}},
cntAndID{3, 3, []int64{0, 1, 3}})
bPhyJobs[6].InstanceLease = types.NewTime(types.FromGoTime(time.Now().UTC()), 0, 0)
// ID jobID eleID InstanceID InstanceLease PhysicalTableID
// -----------------------------------------------------------------------
// 0 jobID2 eleID2 "" 1
// 1 jobID2 eleID2 "" 1
// 0 jobID2 eleID3 "" 1
// 1 jobID2 eleID3 "" 1
// 0 jobID1 eleID1 "" 1
// 1 jobID1 eleID1 "uuid_1" currentTime-hour 1
// 2 jobID1 eleID1 "" 2
// 3 jobID1 eleID1 "" currentTime 1 // should not exist
// 4 jobID1 eleID1 "" 3
// 5 jobID1 eleID1 "" 3
// 6 jobID1 eleID1 "" currentTime 2 // should not exist
// 7 jobID1 eleID1 "" 4
// 8 jobID1 eleID1 "" 3
// 9 jobID1 eleID1 "" 4
checkAndClean(cntAndID{3, 2, []int64{2, 6}},
cntAndID{10, 10, []int64{0, 1, 3}})
bPhyJobs[6].InstanceID = "uuid_2"
// ID jobID eleID InstanceID InstanceLease PhysicalTableID
// -----------------------------------------------------------------------
// 0 jobID2 eleID2 "" 1
// 1 jobID2 eleID2 "" 1
// 0 jobID2 eleID3 "" 1
// 1 jobID2 eleID3 "" 1
// 0 jobID1 eleID1 "" 1
// 1 jobID1 eleID1 "uuid_1" currentTime-hour 1
// 2 jobID1 eleID1 "" 2
// 3 jobID1 eleID1 "" currentTime 1 // should not exist
// 4 jobID1 eleID1 "" 3
// 5 jobID1 eleID1 "" 3
// 6 jobID1 eleID1 "" currentTime 2
// 6 jobID1 eleID1 "uuid_2" currentTime 2 // should not exist
// 7 jobID1 eleID1 "" 4
// 8 jobID1 eleID1 "" 3
// 9 jobID1 eleID1 "" 4
bPhyJobs[2].InstanceID = "uuid_2"
checkAndClean(2, 2, 4)
checkAndClean(cntAndID{3, 3, []int64{4, 5, 8}},
cntAndID{10, 9, []int64{0, 1, 3}})
// ID jobID eleID InstanceID InstanceLease PhysicalTableID
// -----------------------------------------------------------------------
// 0 jobID2 eleID2 "" 1
Expand Down

0 comments on commit 4b69f6c

Please sign in to comment.