diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 01d8c3bc94a83..dbfe311fee0b0 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -69,7 +69,6 @@ const ( minGenPhysicalTableTaskBatch = 64 minDistTaskCnt = 64 retrySQLTimes = 10 - retrySQLInterval = 300 * time.Millisecond ) // RetrySQLInterval is export for test. @@ -96,9 +95,9 @@ type BackfillJob struct { JobID int64 EleID int64 EleKey []byte + PhysicalTableID int64 Tp backfillerType State model.JobState - PhysicalTableID int64 InstanceID string InstanceLease types.Time // range info @@ -699,8 +698,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, pID int64, kvRanges []kv if err != nil { logutil.BgLogger().Info("[ddl] get backfill range task, get reverse key failed", zap.Error(err)) } else { - logutil.BgLogger().Info("[ddl] get backfill range task, change end key", - zap.Int64("pID", pID), zap.Int64("pTbl", phyTbl.GetPhysicalID()), zap.Bool("mergeTIdx", reorgInfo.mergingTmpIdx), + logutil.BgLogger().Info("[ddl] get backfill range task, change end key", zap.Int64("pTbl", phyTbl.GetPhysicalID()), zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK))) endKey = endK } diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index be8a0d3bb06f0..94337a02ad809 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -33,6 +33,7 @@ import ( tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -97,12 +98,8 @@ func getRunningPhysicalTableMetas(sess *session, sJobCtx *splitJobContext, reorg } else { for _, pMeta := range pTblMetas { phyTblMetas = append(phyTblMetas, pMeta) - if pMeta.PhyTblID > currPID { - currPID = pMeta.PhyTblID - } - if pMeta.ID > currBfJobID { - currBfJobID = pMeta.ID - } + currPID = mathutil.Max(pMeta.PhyTblID, currPID) + currBfJobID = mathutil.Max(pMeta.ID, currBfJobID) physicalTIDs = append(physicalTIDs, pMeta.PhyTblID) } } @@ -118,13 +115,11 @@ func (dc *ddlCtx) sendPhysicalTableMetas(reorgInfo *reorgInfo, t table.Table, sJ var err error physicalTIDs := make([]int64, 0, distPhysicalTableConcurrency) defer func() { + logutil.BgLogger().Info("[ddl] send physical table ranges to split finished", zap.Int64("jobID", reorgInfo.Job.ID), + zap.Stringer("ele", reorgInfo.currElement), zap.Int64s("phyTblIDs", physicalTIDs), zap.Error(err)) if err != nil { - logutil.BgLogger().Info("[ddl] send physical table ranges to split failed", zap.Int64("jobID", reorgInfo.Job.ID), - zap.Stringer("ele", reorgInfo.currElement), zap.Int64s("phyTblIDs", physicalTIDs), zap.Error(err)) sJobCtx.cancel() } else { - logutil.BgLogger().Info("[ddl] send physical table ranges to split finished", zap.Int64("jobID", reorgInfo.Job.ID), - zap.Stringer("ele", reorgInfo.currElement), zap.Int64s("phyTblIDs", physicalTIDs)) close(sJobCtx.phyTblMetaCh) } }() @@ -181,13 +176,7 @@ func (dc *ddlCtx) sendPhysicalTableMetas(reorgInfo *reorgInfo, t table.Table, sJ physicalTIDs = append(physicalTIDs, pID) } - } //else { - // //nolint:forcetypeassert - // phyTbl := t.(table.PhysicalTable) - // bfJM := &BackfillJobRangeMeta{PhyTblID: phyTbl.Meta().ID, PhyTbl: phyTbl, StartKey: reorgInfo.StartKey, EndKey: reorgInfo.EndKey} - // sJobCtx.phyTblMetaCh <- bfJM - // physicalTIDs = append(physicalTIDs, bfJM.PhyTblID) - // } + } } func (dc *ddlCtx) controlWriteTableRecord(sessPool *sessionPool, t table.Table, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { @@ -468,7 +457,7 @@ func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleI ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_state") if err != nil { logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) - time.Sleep(retrySQLInterval) + time.Sleep(RetrySQLInterval) continue } diff --git a/ddl/job_table.go b/ddl/job_table.go index ef5bd9cc49b0a..59ab4f1e35051 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -643,7 +643,6 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) sqlBuilder.WriteString("insert into mysql.") sqlBuilder.WriteString(tableName) sqlBuilder.WriteString("(id, ddl_job_id, ele_id, ele_key, ddl_physical_tid, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values") - jobs := "" for i, bj := range backfillJobs { mateByte, err := bj.Meta.Encode() if err != nil { @@ -656,7 +655,6 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)", bj.ID, bj.JobID, bj.EleID, wrapKey2String(bj.EleKey), bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey), wrapKey2String(bj.StartKey), wrapKey2String(bj.EndKey), bj.StartTS, bj.FinishTS, bj.RowCount, wrapKey2String(mateByte))) - jobs += fmt.Sprintf("job:%#v; ", bj.AbbrStr()) } return sqlBuilder.String(), nil } @@ -743,12 +741,12 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st } leaseStr := currTime.Add(-lease).Format(types.TimeFormat) - getJobsSQL := fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_key, ele_id limit %d", + getJobsSQL := fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_id, ele_key limit %d", leaseStr, jobID, batch) if pTblID != GetJobWithoutPartition { if pTblID == 0 { rows, err := s.execute(context.Background(), - fmt.Sprintf("select ddl_physical_tid from mysql.%s group by ddl_job_id, ele_id, ele_key, ddl_physical_tid having sum(length(exec_id)) = 0 or (max(exec_lease) < '%s' and max(exec_lease) is not null) order by ddl_job_id, ele_key, ele_id, ddl_physical_tid limit 1", + fmt.Sprintf("select ddl_physical_tid from mysql.%s group by ddl_job_id, ele_id, ele_key, ddl_physical_tid having sum(length(exec_id)) = 0 or max(exec_lease) < '%s' order by ddl_job_id, ele_key, ele_id, ddl_physical_tid limit 1", BackfillTable, leaseStr), "get_mark_backfill_job") if err != nil { return errors.Trace(err) diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 78dc8ff4f08e9..ae04c0eb6dd85 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -268,6 +268,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { eleID1 := int64(11) eleID2 := int64(22) eleID3 := int64(33) + noPID := int64(0) uuid := d.GetID() eleKey := meta.IndexElementKey instanceLease := ddl.InstanceLease @@ -276,7 +277,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJob, err := ddl.GetBackfillJobForOneEle(se, []int64{jobID1, jobID2}, instanceLease) require.NoError(t, err) require.Nil(t, bJob) - bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, 1, instanceLease) + bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, noPID, instanceLease) require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error()) require.Nil(t, bJobs) allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count") @@ -319,7 +320,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.NoError(t, err) }) - bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, 1, instanceLease) + bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, noPID, instanceLease) require.NoError(t, err) require.Len(t, bJobs, 1) expectJob = bjTestCases[2] @@ -372,18 +373,34 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 7 jobID1 eleID1 "" 4 // 8 jobID1 eleID1 "" 3 // 9 jobID1 eleID1 "" 4 - checkAndClean := func(batch, jobCnt int, jobID int64) { + simpleCheck := func(batch, jobCnt int, bfJobIDs []int64, pID int64) { err = ddl.AddBackfillJobs(se, bPhyJobs) require.NoError(t, err) - bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, batch, jobID1, uuid, 0, instanceLease) + bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, batch, jobID1, uuid, pID, instanceLease) require.NoError(t, err) require.Len(t, bJobs, jobCnt) - require.Equal(t, jobID, bJobs[0].ID) + isExist := false + for _, id := range bfJobIDs { + if id == bJobs[0].ID { + isExist = true + } + } + require.True(t, isExist, fmt.Sprintf("expected ids:%v, actual id:%d", bfJobIDs, bJobs[0].ID)) err = ddl.RemoveBackfillJob(se, true, bJobs1[0]) require.NoError(t, err) } - checkAndClean(3, 2, 2) - bPhyJobs[6].InstanceLease = types.NewTime(types.FromGoTime(time.Now().Add(-time.Hour)), 0, 0) + type cntAndID struct { + batch int + bfJobCnt int + bfJobID []int64 + } + checkAndClean := func(expectRet1, expectRet2 cntAndID) { + simpleCheck(expectRet1.batch, expectRet1.bfJobCnt, expectRet1.bfJobID, noPID) + simpleCheck(expectRet2.batch, expectRet2.bfJobCnt, expectRet2.bfJobID, ddl.GetJobWithoutPartition) + } + checkAndClean(cntAndID{3, 3, []int64{0, 1, 3}}, + cntAndID{3, 3, []int64{0, 1, 3}}) + bPhyJobs[1].InstanceLease = types.NewTime(types.FromGoTime(time.Now().Add(-time.Hour).UTC()), 0, 0) // ID jobID eleID InstanceID InstanceLease PhysicalTableID // ----------------------------------------------------------------------- // 0 jobID2 eleID2 "" 1 @@ -391,17 +408,18 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 "" 1 // 1 jobID2 eleID3 "" 1 // 0 jobID1 eleID1 "" 1 - // 1 jobID1 eleID1 "uuid_1" 1 + // 1 jobID1 eleID1 "uuid_1" currentTime-hour 1 // 2 jobID1 eleID1 "" 2 // 3 jobID1 eleID1 "" 1 // 4 jobID1 eleID1 "" 3 // 5 jobID1 eleID1 "" 3 - // 6 jobID1 eleID1 "" currentTime-hour 2 + // 6 jobID1 eleID1 "" 2 // 7 jobID1 eleID1 "" 4 // 8 jobID1 eleID1 "" 3 // 9 jobID1 eleID1 "" 4 - checkAndClean(3, 2, 2) - bPhyJobs[6].InstanceLease = types.NewTime(types.FromGoTime(time.Now()), 0, 0) + checkAndClean(cntAndID{3, 3, []int64{0, 1, 3}}, + cntAndID{3, 3, []int64{0, 1, 3}}) + bPhyJobs[3].InstanceLease = types.NewTime(types.FromGoTime(time.Now().UTC()), 0, 0) // ID jobID eleID InstanceID InstanceLease PhysicalTableID // ----------------------------------------------------------------------- // 0 jobID2 eleID2 "" 1 @@ -409,17 +427,55 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 "" 1 // 1 jobID2 eleID3 "" 1 // 0 jobID1 eleID1 "" 1 - // 1 jobID1 eleID1 "uuid_1" 1 - // 2 jobID1 eleID1 "uuid_2" 2 - // 3 jobID1 eleID1 "" 1 + // 1 jobID1 eleID1 "uuid_1" currentTime-hour 1 + // 2 jobID1 eleID1 "" 2 + // 3 jobID1 eleID1 "" currentTime 1 // should not exist + // 4 jobID1 eleID1 "" 3 + // 5 jobID1 eleID1 "" 3 + // 6 jobID1 eleID1 "" 2 + // 7 jobID1 eleID1 "" 4 + // 8 jobID1 eleID1 "" 3 + // 9 jobID1 eleID1 "" 4 + checkAndClean(cntAndID{3, 2, []int64{2, 6}}, + cntAndID{3, 3, []int64{0, 1, 3}}) + bPhyJobs[6].InstanceLease = types.NewTime(types.FromGoTime(time.Now().UTC()), 0, 0) + // ID jobID eleID InstanceID InstanceLease PhysicalTableID + // ----------------------------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + // 0 jobID1 eleID1 "" 1 + // 1 jobID1 eleID1 "uuid_1" currentTime-hour 1 + // 2 jobID1 eleID1 "" 2 + // 3 jobID1 eleID1 "" currentTime 1 // should not exist + // 4 jobID1 eleID1 "" 3 + // 5 jobID1 eleID1 "" 3 + // 6 jobID1 eleID1 "" currentTime 2 // should not exist + // 7 jobID1 eleID1 "" 4 + // 8 jobID1 eleID1 "" 3 + // 9 jobID1 eleID1 "" 4 + checkAndClean(cntAndID{3, 2, []int64{2, 6}}, + cntAndID{10, 10, []int64{0, 1, 3}}) + bPhyJobs[6].InstanceID = "uuid_2" + // ID jobID eleID InstanceID InstanceLease PhysicalTableID + // ----------------------------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + // 0 jobID1 eleID1 "" 1 + // 1 jobID1 eleID1 "uuid_1" currentTime-hour 1 + // 2 jobID1 eleID1 "" 2 + // 3 jobID1 eleID1 "" currentTime 1 // should not exist // 4 jobID1 eleID1 "" 3 // 5 jobID1 eleID1 "" 3 - // 6 jobID1 eleID1 "" currentTime 2 + // 6 jobID1 eleID1 "uuid_2" currentTime 2 // should not exist // 7 jobID1 eleID1 "" 4 // 8 jobID1 eleID1 "" 3 // 9 jobID1 eleID1 "" 4 - bPhyJobs[2].InstanceID = "uuid_2" - checkAndClean(2, 2, 4) + checkAndClean(cntAndID{3, 3, []int64{4, 5, 8}}, + cntAndID{10, 9, []int64{0, 1, 3}}) // ID jobID eleID InstanceID InstanceLease PhysicalTableID // ----------------------------------------------------------------------- // 0 jobID2 eleID2 "" 1