Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add backfill job related tables and operations #38978

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tidb/util/topsql"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -56,8 +60,65 @@ const (
typeUpdateColumnWorker backfillWorkerType = 1
typeCleanUpIndexWorker backfillWorkerType = 2
typeAddIndexMergeTmpWorker backfillWorkerType = 3

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
)

// enableDistReorg means whether to enable dist reorg. The default is enable.
// TODO: control the behavior
var enableDistReorg = atomicutil.NewBool(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have feature tag in the sessionctx/variable/featuretag. We can test feature when it enables and disables by controlling the feature tag. I think it may be useful for you.

Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

@Benjamin2037 Benjamin2037 Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is enable, I think this should be ture


// DistReorgEnable enables dist reorg. It exports for testing.
func DistReorgEnable() {
enableDistReorg.Store(true)
}

// DistReorgDisable disables dist reorg. It exports for testing.
func DistReorgDisable() {
enableDistReorg.Store(false)
}

// IsDistReorgEnable indicates whether dist reorg enabled. It exports for testing.
func IsDistReorgEnable() bool {
return enableDistReorg.Load()
}

// BackfillJob is for a tidb_ddl_backfill table's record.
type BackfillJob struct {
ID int64
JobID int64
EleID int64
EleKey []byte
Tp model.BackfillType
State model.JobState
StoreID int64
InstanceID string
InstanceLease types.Time
Mate *model.BackfillMeta
}

// AbbrStr returns the BackfillJob's info without the Mate info.
func (bj *BackfillJob) AbbrStr() string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

directly name String or Str

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function only shows some ID related info, we don't use String here (and we're not sure if we need this function later).

return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s",
bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease)
}

// GetOracleTime returns the current time from TS.
func GetOracleTime(store kv.Storage) (time.Time, error) {
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return time.Time{}, errors.Trace(err)
}
return oracle.GetTimeFromTS(currentVer.Ver), nil
}

// GetLeaseGoTime returns a types.Time by adding a lease.
func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
leaseTime := currTime.Add(lease)
return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp)
}

// By now the DDL jobs that need backfilling include:
// 1: add-index
// 2: modify-column-type
Expand Down
34 changes: 34 additions & 0 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// BackfillTable stores the information of backfill jobs.
BackfillTable = "tidb_ddl_backfill"
// BackfillHistoryTable stores the information of history backfill jobs.
BackfillHistoryTable = "tidb_ddl_backfill_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand All @@ -34,11 +38,41 @@ const (
HistoryTableID = meta.MaxInt48 - 3
// MDLTableID is the table ID of `tidb_mdl_info`.
MDLTableID = meta.MaxInt48 - 4
// BackfillTableID is the table ID of `tidb_ddl_backfill`.
BackfillTableID = meta.MaxInt48 - 5
// BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`.
BackfillHistoryTableID = meta.MaxInt48 - 6

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))"
// ReorgTableSQL is the CREATE TABLE SQL of `tidb_ddl_reorg`.
ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))"
// HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`.
HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))"
// BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`.
BackfillTableSQL = "create table " + BackfillTable + `(
section_id bigint not null,
job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
type int,
exec_id blob default null,
exec_lease Time,
state int,
backfill_meta longblob,
unique key(job_id, ele_id, ele_key(20), section_id))`
// BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`.
BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `(
section_id bigint not null,
job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
type int,
exec_id blob default null,
exec_lease Time,
state int,
backfill_meta longblob,
unique key(job_id, ele_id, ele_key(20), section_id))`
)
17 changes: 9 additions & 8 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
return nil, err
}
sessForJob.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
wk.sess = newSession(sessForJob)
wk.sess = NewSession(sessForJob)
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
return wk, nil
}
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func (d *ddl) SwitchMDL(enable bool) error {
return err
}
defer d.sessPool.put(sess)
se := newSession(sess)
se := NewSession(sess)
rows, err := se.execute(ctx, "select 1 from mysql.tidb_ddl_job", "check job")
if err != nil {
return err
Expand Down Expand Up @@ -1349,7 +1349,7 @@ type Info struct {

// GetDDLInfoWithNewTxn returns DDL information using a new txn.
func GetDDLInfoWithNewTxn(s sessionctx.Context) (*Info, error) {
sess := newSession(s)
sess := NewSession(s)
err := sess.begin()
if err != nil {
return nil, err
Expand All @@ -1363,7 +1363,7 @@ func GetDDLInfoWithNewTxn(s sessionctx.Context) (*Info, error) {
func GetDDLInfo(s sessionctx.Context) (*Info, error) {
var err error
info := &Info{}
sess := newSession(s)
sess := NewSession(s)
txn, err := sess.txn()
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -1532,7 +1532,7 @@ func cancelConcurrencyJobs(se sessionctx.Context, ids []int64) ([]error, error)
}
var jobMap = make(map[int64]int) // jobID -> error index

sess := newSession(se)
sess := NewSession(se)
err := sess.begin()
if err != nil {
return nil, err
Expand Down Expand Up @@ -1614,7 +1614,7 @@ func getDDLJobsInQueue(t *meta.Meta, jobListKey meta.JobListKeyType) ([]*model.J
// GetAllDDLJobs get all DDL jobs and sorts jobs by job.ID.
func GetAllDDLJobs(sess sessionctx.Context, t *meta.Meta) ([]*model.Job, error) {
if variable.EnableConcurrentDDL.Load() {
return getJobsBySQL(newSession(sess), JobTable, "1 order by job_id")
return getJobsBySQL(NewSession(sess), JobTable, "1 order by job_id")
}

return getDDLJobs(t)
Expand Down Expand Up @@ -1700,7 +1700,8 @@ type session struct {
sessionctx.Context
}

func newSession(s sessionctx.Context) *session {
// NewSession news the session and it is export for testing.
func NewSession(s sessionctx.Context) *session {
return &session{s}
}

Expand Down Expand Up @@ -1822,7 +1823,7 @@ func GetHistoryJobByID(sess sessionctx.Context, id int64) (*model.Job, error) {

// AddHistoryDDLJobForTest used for test.
func AddHistoryDDLJobForTest(sess sessionctx.Context, t *meta.Meta, job *model.Job, updateRawArgs bool) error {
return AddHistoryDDLJob(newSession(sess), t, job, updateRawArgs, variable.EnableConcurrentDDL.Load())
return AddHistoryDDLJob(NewSession(sess), t, job, updateRawArgs, variable.EnableConcurrentDDL.Load())
}

// AddHistoryDDLJob record the history job.
Expand Down
8 changes: 4 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
return errors.Trace(err)
}
defer d.sessPool.put(sess)
job, err := getJobsBySQL(newSession(sess), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster))
job, err := getJobsBySQL(NewSession(sess), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
injectModifyJobArgFailPoint(job)
}
sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err = insertDDLJobs2Table(newSession(sess), true, jobTasks...)
err = insertDDLJobs2Table(NewSession(sess), true, jobTasks...)
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func cleanMDLInfo(pool *sessionPool, jobID int64) {
sql := fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d", jobID)
sctx, _ := pool.get()
defer pool.put(sctx)
sess := newSession(sctx)
sess := NewSession(sctx)
sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := sess.execute(context.Background(), sql, "delete-mdl-info")
if err != nil {
Expand All @@ -532,7 +532,7 @@ func checkMDLInfo(jobID int64, pool *sessionPool) (bool, error) {
sql := fmt.Sprintf("select * from mysql.tidb_mdl_info where job_id = %d", jobID)
sctx, _ := pool.get()
defer pool.put(sctx)
sess := newSession(sctx)
sess := NewSession(sctx)
rows, err := sess.execute(context.Background(), sql, "check-mdl-info")
if err != nil {
return false, err
Expand Down
Loading