Skip to content

Commit

Permalink
ddl, parser: add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Nov 9, 2022
1 parent 31c4db1 commit d7c6bc7
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 66 deletions.
33 changes: 19 additions & 14 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
typeCleanUpIndexWorker backfillWorkerType = 2
typeAddIndexMergeTmpWorker backfillWorkerType = 3

// InstanceLease is the instance lease.
InstanceLease = 60 // s
)

Expand All @@ -83,24 +84,27 @@ func IsDistReorgEnable() bool {
return enableDistReorg.Load()
}

// BackfillJob is for a tidb_ddl_backfill table's record.
type BackfillJob struct {
ID int64
JobID int64
EleID int64
EleKey []byte
Tp model.BackfillType
State model.JobState
StoreID int64
Instance_ID string
Instance_Lease types.Time
Mate *model.BackfillMeta
}

ID int64
JobID int64
EleID int64
EleKey []byte
Tp model.BackfillType
State model.JobState
StoreID int64
InstanceID string
InstanceLease types.Time
Mate *model.BackfillMeta
}

// AbbrStr returns the BackfillJob's info without the Mate info.
func (bj *BackfillJob) AbbrStr() string {
return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, Instance_ID:%s, Instance_Lease:%s",
bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.Instance_ID, bj.Instance_Lease)
return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s",
bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease)
}

// GetOracleTime returns the current time from TS.
func GetOracleTime(store kv.Storage) (time.Time, error) {
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
Expand All @@ -109,6 +113,7 @@ func GetOracleTime(store kv.Storage) (time.Time, error) {
return oracle.GetTimeFromTS(currentVer.Ver), nil
}

// GetLeaseGoTime returns a types.Time by adding a lease.
func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
leaseTime := currTime.Add(lease)
return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,7 @@ type session struct {
sessionctx.Context
}

// NewSession news the session and it is export for testing.
func NewSession(s sessionctx.Context) *session {
return &session{s}
}
Expand Down
42 changes: 26 additions & 16 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) {
return jobs, nil
}

// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table.
func AddBackfillJobs(sess *session, backfillJobs []*BackfillJob) error {
sqlPrefix := "insert into mysql.tidb_ddl_backfill(section_id, job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, backfill_meta) values"
var sql string
Expand All @@ -517,17 +518,18 @@ func AddBackfillJobs(sess *session, backfillJobs []*BackfillJob) error {

if i == 0 {
sql = sqlPrefix + fmt.Sprintf("(%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s')",
bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.Instance_ID, bj.Instance_Lease, bj.State, mateByte)
bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, mateByte)
continue
}
sql += fmt.Sprintf(", (%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s')",
bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.Instance_ID, bj.Instance_Lease, bj.State, mateByte)
bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, mateByte)
}
_, err = sess.execute(context.Background(), sql, "add_backfill_jobs")
return errors.Trace(err)
})
}

// GetBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element.
func GetBackfillJobsForOneEle(sess *session, batch int, isInclude bool, jobIDs []int64, lease time.Duration) ([]*BackfillJob, error) {
currTime, err := GetOracleTime(sess.GetStore())
if err != nil {
Expand Down Expand Up @@ -559,6 +561,8 @@ func GetBackfillJobsForOneEle(sess *session, batch int, isInclude bool, jobIDs [
return jobs[:validLen], nil
}

// GetAndMarkBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element,
// and update these jobs with instance ID and lease.
func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, isInclude bool, jobIDs []int64, uuid string, lease time.Duration) ([]*BackfillJob, error) {
currTime, err := GetOracleTime(sess.GetStore())
if err != nil {
Expand Down Expand Up @@ -593,8 +597,8 @@ func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, isInclude bool, j
}
validLen++

jobs[i].Instance_ID = uuid
jobs[i].Instance_Lease = GetLeaseGoTime(currTime, lease)
jobs[i].InstanceID = uuid
jobs[i].InstanceLease = GetLeaseGoTime(currTime, lease)
// TODO: batch update
if err = updateBackfillJob(sess, jobs[i], "get_mark_backfill_job"); err != nil {
return err
Expand All @@ -609,6 +613,7 @@ func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, isInclude bool, j
return jobs[:validLen], err
}

// GetInterruptedBackfillJobsForOneEle gets the interrupted backfill jobs in the tblName table that contains only one element.
func GetInterruptedBackfillJobsForOneEle(sess *session, jobID, eleID int64, eleKey []byte) ([]*BackfillJob, error) {
jobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("job_id = %d and ele_id = %d and ele_key = '%s' and (state = %d or state = %d)",
jobID, eleID, eleKey, model.JobStateRollingback, model.JobStateCancelling), "get_interrupt_backfill_job")
Expand All @@ -627,6 +632,7 @@ func GetInterruptedBackfillJobsForOneEle(sess *session, jobID, eleID int64, eleK
return jobs[:validLen], nil
}

// GetBackfillJobCount gets the number of rows in the tblName table according to condition.
func GetBackfillJobCount(sess *session, tblName, condition string, label string) (int, error) {
rows, err := sess.execute(context.Background(), fmt.Sprintf("select count(1) from mysql.%s where %s", tblName, condition), label)
if err != nil {
Expand All @@ -639,6 +645,7 @@ func GetBackfillJobCount(sess *session, tblName, condition string, label string)
return int(rows[0].GetInt64(0)), nil
}

// GetBackfillJobs gets the backfill jobs in the tblName table according to condition.
func GetBackfillJobs(sess *session, tblName, condition string, label string) ([]*BackfillJob, error) {
rows, err := sess.execute(context.Background(), fmt.Sprintf("select * from mysql.%s where %s", tblName, condition), label)
if err != nil {
Expand All @@ -647,15 +654,15 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([]
jobs := make([]*BackfillJob, 0, len(rows))
for _, row := range rows {
job := BackfillJob{
ID: row.GetInt64(0),
JobID: row.GetInt64(1),
EleID: row.GetInt64(2),
EleKey: row.GetBytes(3),
StoreID: row.GetInt64(4),
Tp: model.BackfillType(row.GetInt64(5)),
Instance_ID: row.GetString(6),
Instance_Lease: row.GetTime(7),
State: model.JobState(row.GetInt64(8)),
ID: row.GetInt64(0),
JobID: row.GetInt64(1),
EleID: row.GetInt64(2),
EleKey: row.GetBytes(3),
StoreID: row.GetInt64(4),
Tp: model.BackfillType(row.GetInt64(5)),
InstanceID: row.GetString(6),
InstanceLease: row.GetTime(7),
State: model.JobState(row.GetInt64(8)),
}
job.Mate = &model.BackfillMeta{}
err = job.Mate.Decode(row.GetBytes(9))
Expand All @@ -667,6 +674,8 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([]
return jobs, nil
}

// RemoveBackfillJob removes the backfill jobs from the tidb_ddl_backfill table.
// If isAll is all, removes all jobs in table. Otherwise, removes the backfillJob related job.
func RemoveBackfillJob(sess *session, isAll bool, backfillJob *BackfillJob) error {
sql := "delete from mysql.tidb_ddl_backfill"
if !isAll {
Expand All @@ -683,11 +692,12 @@ func updateBackfillJob(sess *session, backfillJob *BackfillJob, label string) er
return err
}
sql := fmt.Sprintf("update mysql.tidb_ddl_backfill set exec_id = '%s', exec_lease = '%s', state = %d, backfill_meta = '%s' where section_id = %d",
backfillJob.Instance_ID, backfillJob.Instance_Lease, backfillJob.State, mate, backfillJob.ID)
backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State, mate, backfillJob.ID)
_, err = sess.execute(context.Background(), sql, label)
return err
}

// AddBackfillHistoryJob adds the backfill jobs to the tidb_ddl_backfill_history table.
func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error {
sqlPrefix := "insert into mysql.tidb_ddl_backfill_history(section_id, job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, backfill_meta) values"
var sql string
Expand All @@ -699,11 +709,11 @@ func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error {

if i == 0 {
sql = sqlPrefix + fmt.Sprintf("(%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s')",
bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.Instance_ID, bj.Instance_Lease, bj.State, mateByte)
bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, mateByte)
continue
}
sql += fmt.Sprintf(", (%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s')",
bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.Instance_ID, bj.Instance_Lease, bj.State, mateByte)
bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, mateByte)
}
_, err := sess.execute(context.Background(), sql, "add_backfill_history_job")
return err
Expand Down
21 changes: 11 additions & 10 deletions ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func equalBackfillJob(t *testing.T, a, b *ddl.BackfillJob, lessTime types.Time)
require.Equal(t, a.EleID, b.EleID)
require.Equal(t, a.EleKey, b.EleKey)
require.Equal(t, a.StoreID, b.StoreID)
require.Equal(t, a.Instance_ID, b.Instance_ID)
require.GreaterOrEqual(t, b.Instance_Lease.Compare(lessTime), 0)
require.Equal(t, a.InstanceID, b.InstanceID)
require.GreaterOrEqual(t, b.InstanceLease.Compare(lessTime), 0)
require.Equal(t, a.State, b.State)
require.Equal(t, a.Mate, b.Mate)
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
bjTestCases = append(bjTestCases, bJobs1...)
bjTestCases = append(bjTestCases, bJobs2...)
err = ddl.AddBackfillJobs(se, bjTestCases)
// ID jobID eleID Instance_ID
// ID jobID eleID InstanceID
// -------------------------------------
// 0 jobID eleID1 uuid
// 1 jobID eleID1 ""
Expand All @@ -281,12 +281,12 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
require.NoError(t, err)
require.Len(t, bJobs, 1)
bjRet := bjTestCases[0]
bjRet.Instance_ID = uuid
bjRet.InstanceID = uuid
equalBackfillJob(t, bjRet, bJobs[0], ddl.GetLeaseGoTime(previousTime, instanceLease))
currTime, err := ddl.GetOracleTime(se.GetStore())
require.NoError(t, err)
currGoTime := ddl.GetLeaseGoTime(currTime, instanceLease)
require.GreaterOrEqual(t, currGoTime.Compare(bJobs[0].Instance_Lease), 0)
require.GreaterOrEqual(t, currGoTime.Compare(bJobs[0].InstanceLease), 0)
allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID, eleID2), "test_get_bj")
require.NoError(t, err)
require.Equal(t, allCnt, cnt)
Expand Down Expand Up @@ -317,11 +317,12 @@ func TestSimpleExecBackfillJobs(t *testing.T) {

// test history backfill jobs
err = ddl.AddBackfillHistoryJob(se, []*ddl.BackfillJob{bJobs1[0]})
require.NoError(t, err)
// ID jobID eleID
// ------------------------
// 0 jobID eleID1
require.NoError(t, err)
currTime, err = ddl.GetOracleTime(se.GetStore())
require.NoError(t, err)
condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and job_id = %d order by job_id", currTime.Add(-instanceLease), jobID)
bJobs, err = ddl.GetBackfillJobs(se, ddl.BackfillHistoryTable, condition, "test_get_bj")
require.NoError(t, err)
Expand Down Expand Up @@ -363,8 +364,8 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
equalBackfillJob(t, bJobs1[1], bJobs[1], types.ZeroTime)

// test the BackfillJob's AbbrStr
require.Equal(t, fmt.Sprintf("ID:2, JobID:3, EleID:4, Type:add index, State:rollingback, Instance_ID:%s, Instance_Lease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr())
require.Equal(t, "ID:3, JobID:3, EleID:4, Type:add index, State:cancelling, Instance_ID:, Instance_Lease:0000-00-00 00:00:00", bJobs1[1].AbbrStr())
require.Equal(t, "ID:0, JobID:3, EleID:5, Type:add index, State:none, Instance_ID:, Instance_Lease:0000-00-00 00:00:00", bJobs2[0].AbbrStr())
require.Equal(t, "ID:1, JobID:3, EleID:5, Type:add index, State:none, Instance_ID:, Instance_Lease:0000-00-00 00:00:00", bJobs2[1].AbbrStr())
require.Equal(t, fmt.Sprintf("ID:2, JobID:3, EleID:4, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr())
require.Equal(t, "ID:3, JobID:3, EleID:4, Type:add index, State:cancelling, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr())
require.Equal(t, "ID:0, JobID:3, EleID:5, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs2[0].AbbrStr())
require.Equal(t, "ID:1, JobID:3, EleID:5, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs2[1].AbbrStr())
}
8 changes: 5 additions & 3 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ const (

// DDLTableVersion1 is for support concurrent DDL, it added tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history.
DDLTableVersion1 = "1"
// DDLTableVersion2 is for support distributed reorg stage, it added tidb_ddl_backfill.
// DDLTableVersion2 is for support MDL tables.
DDLTableVersion2 = "2"
// DDLTableVersion3 is for support distributed reorg stage, it added tidb_ddl_backfill.
DDLTableVersion3 = "3"
)

var (
Expand Down Expand Up @@ -568,7 +570,7 @@ func (m *Meta) SetDDLTables(ddlTableVersion string) error {

// SetMDLTables write a key into storage.
func (m *Meta) SetMDLTables() error {
err := m.txn.Set(mDDLTableVersion, []byte("2"))
err := m.txn.Set(mDDLTableVersion, []byte(DDLTableVersion2))
return errors.Trace(err)
}

Expand Down Expand Up @@ -623,7 +625,7 @@ func (m *Meta) CheckMDLTableExists() (bool, error) {
if err != nil {
return false, errors.Trace(err)
}
return bytes.Equal(v, []byte("2")), nil
return bytes.Equal(v, []byte(DDLTableVersion2)), nil
}

// SetConcurrentDDL set the concurrent DDL flag.
Expand Down
15 changes: 10 additions & 5 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,16 +842,21 @@ func TestDDLTable(t *testing.T) {

m := meta.NewMeta(txn)

exists, err := m.CheckDDLTableExists()
ver, err := m.CheckDDLTableVersion()
require.NoError(t, err)
require.False(t, exists)
require.Equal(t, "", ver)

err = m.SetDDLTables()
err = m.SetMDLTables()
require.NoError(t, err)
ver, err = m.CheckDDLTableVersion()
require.NoError(t, err)
require.Equal(t, meta.DDLTableVersion2, ver)

exists, err = m.CheckDDLTableExists()
err = m.SetDDLTables(meta.DDLTableVersion3)
require.NoError(t, err)
ver, err = m.CheckDDLTableVersion()
require.NoError(t, err)
require.True(t, exists)
require.Equal(t, meta.DDLTableVersion3, ver)

err = m.SetConcurrentDDL(true)
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,14 +387,17 @@ func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) {
sub.SchemaVer = ver
}

// BackfillType indicates the backfill type.
type BackfillType byte

// List backfill types.
const (
TypeAddIndexBackfill BackfillType = 1
TypeUpdateColumnBackfill BackfillType = 2
TypeCleanUpIndexBackfill BackfillType = 3
)

// String implements fmt.Stringer interface.
func (bt BackfillType) String() string {
switch bt {
case TypeAddIndexBackfill:
Expand All @@ -408,6 +411,7 @@ func (bt BackfillType) String() string {
}
}

// JobMeta is meta info of Job.
type JobMeta struct {
SchemaID int64 `json:"schema_id"`
TableID int64 `json:"table_id"`
Expand All @@ -417,6 +421,7 @@ type JobMeta struct {
Priority int `json:"priority"`
}

// BackfillMeta is meta info of the backfill job.
type BackfillMeta struct {
CurrKey []byte `json:"curr_key"`
StartKey []byte `json:"start_key"`
Expand Down
6 changes: 3 additions & 3 deletions session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ func TestInitDDLJobTables(t *testing.T) {
defer func() {
ddl.DistReorgDisable()
}()
err = InitDDLJobTables(store)
require.NoError(t, err)
dom1, err := BootstrapSession(store)
require.NoError(t, err)
defer dom1.Close()
Expand Down Expand Up @@ -202,10 +200,12 @@ func TestBootstrapWithError(t *testing.T) {
se.txn.init()
se.mu.values = make(map[fmt.Stringer]interface{})
se.SetValue(sessionctx.Initing, true)
err := InitDDLJobTables(store)
err := InitDDLJobTables(store, meta.DDLTableVersion1)
require.NoError(t, err)
err = InitMDLTable(store)
require.NoError(t, err)
err = InitDDLJobTables(store, meta.DDLTableVersion3)
require.NoError(t, err)
dom, err := domap.Get(store)
require.NoError(t, err)
domain.BindDomain(se, dom)
Expand Down
Loading

0 comments on commit d7c6bc7

Please sign in to comment.