diff --git a/ddl/column_test.go b/ddl/column_test.go index 0b9ffc1b6e2ca..67f7e5ce1de44 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -61,8 +61,8 @@ func (s *testColumnSuite) TearDownSuite(c *C) { testleak.AfterTest(c)() } -func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, - colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job { +func buildCreateColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, + pos *ast.ColumnPosition, defaultValue interface{}) *model.Job { col := &model.ColumnInfo{ Name: model.NewCIStr(colName), Offset: len(tblInfo.Columns), @@ -79,7 +79,12 @@ func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{col, pos, 0}, } + return job +} +func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, + colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job { + job := buildCreateColumnJob(dbInfo, tblInfo, colName, pos, defaultValue) err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) v := getSchemaVer(c, ctx) @@ -87,14 +92,18 @@ func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo return job } -func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job { - job := &model.Job{ +func buildDropColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string) *model.Job { + return &model.Job{ SchemaID: dbInfo.ID, TableID: tblInfo.ID, Type: model.ActionDropColumn, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{model.NewCIStr(colName)}, } +} + +func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job { + job := buildDropColumnJob(dbInfo, tblInfo, colName) err := d.doDDLJob(ctx, job) if isError { c.Assert(err, NotNil) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index b3d7f02ec90e1..907ded50ca487 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -28,13 +28,13 @@ import ( "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/terror" + "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" "golang.org/x/net/context" @@ -580,15 +580,14 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin if times != 0 { return } - var qLen int64 - var err1 error + var qLen int for { kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { - m := meta.NewMeta(txn) - qLen, err1 = m.DDLJobQueueLen() + jobs, err1 := admin.GetDDLJobs(txn) if err1 != nil { return err1 } + qLen = len(jobs) return nil }) if qLen == 2 { diff --git a/ddl/ddl.go b/ddl/ddl.go index 4fd023bce78df..9f27b07e51096 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -227,7 +227,7 @@ type ddl struct { quitCh chan struct{} *ddlCtx - workers []*worker + workers map[workerType]*worker } // ddlCtx is the context when we use worker to handle DDL jobs. @@ -236,7 +236,6 @@ type ddlCtx struct { store kv.Storage ownerManager owner.Manager schemaSyncer SchemaSyncer - ddlJobCh chan struct{} ddlJobDoneCh chan struct{} ddlEventCh chan<- *util.Event lease time.Duration // lease is schema lease. @@ -317,7 +316,6 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, uuid: id, store: store, lease: lease, - ddlJobCh: make(chan struct{}, 1), ddlJobDoneCh: make(chan struct{}, 1), ownerManager: manager, schemaSyncer: syncer, @@ -359,20 +357,20 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) { err := d.ownerManager.CampaignOwner(ctx) terror.Log(errors.Trace(err)) - d.workers = make([]*worker, 1) - // TODO: Add addIdxWorker. - d.workers[0] = newWorker(generalWorker, 0, d.store, ctxPool) + d.workers = make(map[workerType]*worker, 2) + d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool) + d.workers[addIdxWorker] = newWorker(addIdxWorker, 1, d.store, ctxPool) for _, worker := range d.workers { worker.wg.Add(1) go worker.start(d.ddlCtx) // TODO: Add the type of DDL worker. metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc() + + // When the start function is called, we will send a fake job to let worker + // checks owner firstly and try to find whether a job exists and run. + asyncNotify(worker.ddlJobCh) } } - - // For every start, we will send a fake job to let worker - // check owner firstly and try to find whether a job exists and run. - asyncNotify(d.ddlJobCh) } func (d *ddl) close() { @@ -418,16 +416,15 @@ func (d *ddl) genGlobalID() (int64, error) { globalID, err = meta.NewMeta(txn).GenGlobalID() return errors.Trace(err) }) + return globalID, errors.Trace(err) } -// generalWorker returns the first worker. The ddl structure has only one worker before we implement the parallel worker. +// generalWorker returns the general worker. // It's used for testing. +// TODO: Remove this function. func (d *ddl) generalWorker() *worker { - if len(d.workers) == 0 { - return nil - } - return d.workers[0] + return d.workers[generalWorker] } // SchemaSyncer implements DDL.SchemaSyncer interface. @@ -449,6 +446,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration { return 1 * time.Second } +func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) { + // If the workers don't run, we needn't to notify workers. + if !RunWorker { + return + } + + if jobTp == model.ActionAddIndex { + asyncNotify(d.workers[addIdxWorker].ddlJobCh) + } else { + asyncNotify(d.workers[generalWorker].ddlJobCh) + } +} + func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { // For every DDL, we must commit current transaction. if err := ctx.NewTxn(); err != nil { @@ -463,7 +473,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true // Notice worker that we push a new job and wait the job done. - asyncNotify(d.ddlJobCh) + d.asyncNotifyWorker(job.Type) log.Infof("[ddl] start DDL job %s, Query:%s", job, job.Query) var historyJob *model.Job diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index f07ce685ada6d..cc795ccae7ddb 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -134,8 +134,8 @@ func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJo } } -func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job { - job := &model.Job{ +func buildCreateIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job { + return &model.Job{ SchemaID: dbInfo.ID, TableID: tblInfo.ID, Type: model.ActionAddIndex, @@ -145,7 +145,10 @@ func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, Column: &ast.ColumnName{Name: model.NewCIStr(colName)}, Length: types.UnspecifiedLength}}}, } +} +func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job { + job := buildCreateIdxJob(dbInfo, tblInfo, unique, indexName, colName) err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) v := getSchemaVer(c, ctx) @@ -153,18 +156,31 @@ func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, return job } -func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job { - job := &model.Job{ +func buildDropIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job { + return &model.Job{ SchemaID: dbInfo.ID, TableID: tblInfo.ID, Type: model.ActionDropIndex, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{model.NewCIStr(indexName)}, } +} +func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job { + job := buildDropIdxJob(dbInfo, tblInfo, indexName) err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) v := getSchemaVer(c, ctx) checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) return job } + +func buildRebaseAutoIDJobJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, newBaseID int64) *model.Job { + return &model.Job{ + SchemaID: dbInfo.ID, + TableID: tblInfo.ID, + Type: model.ActionRebaseAutoID, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{newBaseID}, + } +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 51fd0537f5dd7..c31bd0caf19b1 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -38,20 +38,24 @@ var RunWorker = true type workerType byte const ( - // generalWorker is the worker who handles all DDL worker now. - // TODO: update the comments when we support the addIdxWorker. + // generalWorker is the worker who handles all DDL statements except “add index”. generalWorker workerType = 0 - addIdxWorker workerType = 1 + // addIdxWorker is the worker who handles the operation of adding indexes. + addIdxWorker workerType = 1 + // waitDependencyJobInterval is the interval when the dependency job doesn't be done. + waitDependencyJobInterval = 200 * time.Millisecond + // noneDependencyJob means a job has no dependency-job. + noneDependencyJob = 0 ) // worker is used for handling DDL jobs. -// Now we have two kinds of workers, but we only use the generalWorker. -// TODO: update the comments when we support the addIdxWorker. +// Now we have two kinds of workers. type worker struct { - id int - tp workerType - quitCh chan struct{} - wg sync.WaitGroup + id int + tp workerType + ddlJobCh chan struct{} + quitCh chan struct{} + wg sync.WaitGroup reorgCtx *reorgCtx // reorgCtx is used for reorganization. delRangeManager delRangeManager @@ -61,6 +65,7 @@ func newWorker(tp workerType, id int, store kv.Storage, ctxPool *pools.ResourceP worker := &worker{ id: id, tp: tp, + ddlJobCh: make(chan struct{}, 1), quitCh: make(chan struct{}), reorgCtx: &reorgCtx{notifyCancelReorgJob: 0}, } @@ -74,17 +79,21 @@ func newWorker(tp workerType, id int, store kv.Storage, ctxPool *pools.ResourceP return worker } -func (w *worker) String() string { +func (w *worker) typeStr() string { var str string switch w.tp { case generalWorker: str = "general" case addIdxWorker: - str = "add index" + str = model.AddIndexStr default: str = "unknow" } - return fmt.Sprintf("%d, tp %s", w.id, str) + return str +} + +func (w *worker) String() string { + return fmt.Sprintf("%d, tp %s", w.id, w.typeStr()) } func (w *worker) close() { @@ -118,23 +127,18 @@ func (w *worker) start(d *ddlCtx) { } }() - // shouldCleanJobs is used to determine whether to clean up the job in adding index queue. - shouldCleanJobs := true for { select { case <-ticker.C: log.Debugf("[ddl] worker %s waits %s to check DDL status again", w, checkTime) - case <-d.ddlJobCh: + case <-w.ddlJobCh: case <-w.quitCh: return } - err := w.handleDDLJobQueue(d, shouldCleanJobs) + err := w.handleDDLJobQueue(d) if err != nil { log.Errorf("[ddl] worker %s handles DDL job err %v", w, errors.ErrorStack(err)) - } else if shouldCleanJobs { - log.Infof("[ddl] worker %s cleans jobs in the adding index queue finished.", w) - shouldCleanJobs = false } } } @@ -149,10 +153,20 @@ func asyncNotify(ch chan struct{}) { // buildJobDependence sets the curjob's dependency-ID. // The dependency-job's ID must less than the current job's ID, and we need the largest one in the list. func buildJobDependence(t *meta.Meta, curJob *model.Job) error { - jobs, err := t.GetAllDDLJobs() + // Jobs in the same queue are ordered. If we want to find a job's dependency-job, we need to look for + // it from the other queue. So if the job is "ActionAddIndex" job, we need find its dependency-job from DefaultJobList. + var jobs []*model.Job + var err error + switch curJob.Type { + case model.ActionAddIndex: + jobs, err = t.GetAllDDLJobsInQueue(meta.DefaultJobListKey) + default: + jobs, err = t.GetAllDDLJobsInQueue(meta.AddIndexJobListKey) + } if err != nil { return errors.Trace(err) } + for _, job := range jobs { if curJob.ID < job.ID { continue @@ -162,6 +176,7 @@ func buildJobDependence(t *meta.Meta, curJob *model.Job) error { return errors.Trace(err) } if isDependent { + log.Infof("[ddl] current DDL job %v depends on job %v", curJob, job) curJob.DependencyID = job.ID break } @@ -175,14 +190,18 @@ func (d *ddl) addDDLJob(ctx sessionctx.Context, job *model.Job) error { job.Version = currentVersion job.Query, _ = ctx.Value(sessionctx.QueryString).(string) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) + t := newMetaWithQueueTp(txn, job.Type.String()) var err error job.ID, err = t.GenGlobalID() if err != nil { return errors.Trace(err) } job.StartTS = txn.StartTS() + if err = buildJobDependence(t, job); err != nil { + return errors.Trace(err) + } err = t.EnQueueDDLJob(job) + return errors.Trace(err) }) metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) @@ -282,41 +301,61 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { return errors.Trace(err) } +func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) { + if job.DependencyID == noneDependencyJob { + return true, nil + } + + historyJob, err := t.GetHistoryDDLJob(job.DependencyID) + if err != nil { + return false, errors.Trace(err) + } + if historyJob == nil { + return false, nil + } + log.Infof("[ddl] current DDL job %v dependent job ID %d is finished", job, job.DependencyID) + job.DependencyID = noneDependencyJob + return true, nil +} + +func newMetaWithQueueTp(txn kv.Transaction, tp string) *meta.Meta { + if tp == model.AddIndexStr { + return meta.NewMeta(txn, meta.AddIndexJobListKey) + } + return meta.NewMeta(txn) +} + // handleDDLJobQueue handles DDL jobs in DDL Job queue. -// shouldCleanJobs is used to determine whether to clean up the job in adding index queue. -func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error { +func (w *worker) handleDDLJobQueue(d *ddlCtx) error { once := true + waitDependencyJobCnt := 0 for { if isChanClosed(w.quitCh) { return nil } - waitTime := 2 * d.lease - var ( job *model.Job schemaVer int64 runJobErr error ) + waitTime := 2 * d.lease err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { // We are not owner, return and retry checking later. if !d.isOwner() { return nil } - // It's used for clean up the job in adding index queue before we support adding index queue. - // TODO: Remove this logic after we support the adding index queue. - if shouldCleanJobs { - return errors.Trace(w.cleanAddIndexQueueJobs(d, txn)) - } - var err error - t := meta.NewMeta(txn) + t := newMetaWithQueueTp(txn, w.typeStr()) // We become the owner. Get the first job and run it. job, err = w.getFirstDDLJob(t) if job == nil || err != nil { return errors.Trace(err) } + if isDone, err1 := isDependencyJobDone(t, job); err1 != nil || !isDone { + return errors.Trace(err1) + } if once { w.waitSchemaSynced(d, job, waitTime) @@ -362,6 +401,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error { // No job now, return and retry getting later. return nil } + w.waitDependencyJobFinished(job, &waitDependencyJobCnt) d.mu.RLock() d.mu.hook.OnJobUpdated(job) @@ -379,6 +419,21 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error { } } +// waitDependencyJobFinished waits for the dependency-job to be finished. +// If the dependency job isn't finished yet, we'd better wait a moment. +func (w *worker) waitDependencyJobFinished(job *model.Job, cnt *int) { + if job.DependencyID != noneDependencyJob { + intervalCnt := int(3 * time.Second / waitDependencyJobInterval) + if *cnt%intervalCnt == 0 { + log.Infof("[ddl] worker %s job %d needs to wait dependency job %d, sleeps a while:%v then retries it.", w, job.ID, job.DependencyID, waitDependencyJobInterval) + } + time.Sleep(waitDependencyJobInterval) + *cnt++ + } else { + *cnt = 0 + } +} + func chooseLeaseTime(t, max time.Duration) time.Duration { if t == 0 || t > max { return max @@ -587,69 +642,6 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) { return schemaVersion, errors.Trace(err) } -// cleanAddIndexQueueJobs cleans jobs in adding index queue. -// It's only done once after the worker become the owner. -// TODO: Remove this logic after we support the adding index queue. -func (w *worker) cleanAddIndexQueueJobs(d *ddlCtx, txn kv.Transaction) error { - startTime := time.Now() - m := meta.NewMeta(txn) - m.SetJobListKey(meta.AddIndexJobListKey) - for { - job, err := w.getFirstDDLJob(m) - if err != nil { - return errors.Trace(err) - } - if job == nil { - log.Infof("[ddl] cleaning jobs in the adding index queue takes time %v.", time.Since(startTime)) - return nil - } - log.Infof("[ddl] cleaning job %v in the adding index queue.", job) - - // The types of these jobs must be ActionAddIndex. - if job.SchemaState == model.StatePublic || job.SchemaState == model.StateNone { - if job.SchemaState == model.StateNone { - job.State = model.JobStateCancelled - } else { - binloginfo.SetDDLBinlog(d.binlogCli, txn, job.ID, job.Query) - job.State = model.JobStateSynced - } - err = w.finishDDLJob(m, job) - if err != nil { - return errors.Trace(err) - } - continue - } - - // When the job not in "none" and "public" state, we need to rollback it. - schemaID := job.SchemaID - tblInfo, err := getTableInfo(m, job, schemaID) - if err != nil { - return errors.Trace(err) - } - var indexName model.CIStr - var unique bool - err = job.DecodeArgs(&unique, &indexName) - if err != nil { - return errors.Trace(err) - } - indexInfo := findIndexByName(indexName.L, tblInfo.Indices) - _, err = convert2RollbackJob(m, job, tblInfo, indexInfo, nil) - if err == nil { - _, err = m.DeQueueDDLJob() - } - if err != nil { - return errors.Trace(err) - } - // Put the job to the default job list. - m.SetJobListKey(meta.DefaultJobListKey) - err = m.EnQueueDDLJob(job) - m.SetJobListKey(meta.AddIndexJobListKey) - if err != nil { - return errors.Trace(err) - } - } -} - func isChanClosed(quitCh chan struct{}) bool { select { case <-quitCh: diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index f46a175a3ae67..4527a26a99ad1 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -14,6 +14,7 @@ package ddl import ( + "sync" "time" "github.com/juju/errors" @@ -69,26 +70,7 @@ func (s *testDDLSuite) TestRunWorker(c *C) { d := testNewDDL(context.Background(), nil, store, nil, nil, testLease) testCheckOwner(c, d, false) defer d.Stop() - ctx := testNewContext(d) - - dbInfo := testSchemaInfo(c, d, "test") - job := &model.Job{ - SchemaID: dbInfo.ID, - Type: model.ActionCreateSchema, - BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{dbInfo}, - } - exitCh := make(chan struct{}) - go func(ch chan struct{}) { - err := d.doDDLJob(ctx, job) - c.Assert(err, IsNil) - close(ch) - }(exitCh) - // Make sure the DDL job is in the DDL job queue. - // The reason for doing it twice is to eliminate the operation in the start function. - <-d.ddlJobCh - <-d.ddlJobCh // Make sure the DDL worker is nil. worker := d.generalWorker() c.Assert(worker, IsNil) @@ -97,125 +79,8 @@ func (s *testDDLSuite) TestRunWorker(c *C) { d1 := testNewDDL(context.Background(), nil, store, nil, nil, testLease) testCheckOwner(c, d1, true) defer d1.Stop() - asyncNotify(d1.ddlJobCh) - <-exitCh -} - -func (s *testDDLSuite) TestCleanJobs(c *C) { - defer testleak.AfterTest(c)() - store := testCreateStore(c, "test_clean_jobs") - defer store.Close() - d := testNewDDL(context.Background(), nil, store, nil, nil, testLease) - - ctx := testNewContext(d) - dbInfo := testSchemaInfo(c, d, "test") - testCreateSchema(c, ctx, d, dbInfo) - tblInfo := testTableInfo(c, d, "t", 2) - testCreateTable(c, ctx, d, dbInfo, tblInfo) - - var failedJobIDs []int64 - job := &model.Job{ - SchemaID: dbInfo.ID, - TableID: tblInfo.ID, - Type: model.ActionAddIndex, - BinlogInfo: &model.HistoryInfo{}, - } - idxColNames := []*ast.IndexColName{{ - Column: &ast.ColumnName{Name: model.NewCIStr("c1")}, - Length: types.UnspecifiedLength}} - // Add some adding index jobs to AddIndexJobList. - backfillAddIndexJob := func(jobArgs []interface{}) { - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - var err error - t := meta.NewMeta(txn) - t.SetJobListKey(meta.AddIndexJobListKey) - job.ID, err = t.GenGlobalID() - c.Assert(err, IsNil) - failedJobIDs = append(failedJobIDs, job.ID) - job.Args = jobArgs - err = t.EnQueueDDLJob(job) - c.Assert(err, IsNil) - return nil - }) - } - - // Add a StateNone job. - indexName := model.NewCIStr("idx_none") - args := []interface{}{false, indexName, idxColNames, nil} - backfillAddIndexJob(args) - // Add a StateDeleteOnly job. - indexName = model.NewCIStr("idx_delete_only") - args = []interface{}{false, indexName, idxColNames, nil} - backfillAddIndexJob(args) - changeJobState := func() { - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - t.SetJobListKey(meta.AddIndexJobListKey) - lastJobID := int64(len(failedJobIDs) - 1) - job, err1 := t.GetDDLJob(lastJobID) - c.Assert(err1, IsNil) - _, err1 = d.generalWorker().runDDLJob(d.ddlCtx, t, job) - c.Assert(err1, IsNil) - _, err1 = updateSchemaVersion(t, job) - c.Assert(err1, IsNil) - err1 = t.UpdateDDLJob(lastJobID, job, true) - c.Assert(err1, IsNil) - return nil - }) - err := d.callHookOnChanged(nil) - c.Assert(err, IsNil) - } - changeJobState() - // Add a StateWriteReorganization job. - indexName = model.NewCIStr("idx_write_reorg") - args = []interface{}{false, indexName, idxColNames, nil} - backfillAddIndexJob(args) - changeJobState() // convert to delete only - changeJobState() // convert to write only - changeJobState() // convert to write reorg - - err := d.Stop() - c.Assert(err, IsNil) - // Make sure shouldCleanJobs is ture. - d = testNewDDL(context.Background(), nil, store, nil, nil, testLease) - defer d.Stop() - - // Make sure all DDL jobs are done. - for { - var isAllJobDone bool - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - len, err := t.DDLJobQueueLen() - c.Assert(err, IsNil) - t.SetJobListKey(meta.AddIndexJobListKey) - addIndexLen, err := t.DDLJobQueueLen() - c.Assert(err, IsNil) - if len == 0 && addIndexLen == 0 { - isAllJobDone = true - } - return nil - }) - if isAllJobDone { - break - } - time.Sleep(time.Millisecond) - } - - // Check that the jobs in add index list are finished. - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - for i, id := range failedJobIDs { - historyJob, err := t.GetHistoryDDLJob(id) - c.Assert(err, IsNil) - c.Assert(historyJob, NotNil, Commentf("job %v", historyJob)) - if i == 0 { - c.Assert(historyJob.State, Equals, model.JobStateCancelled) - } else { - c.Assert(historyJob.State, Equals, model.JobStateRollbackDone) - } - } - return nil - }) + worker = d1.generalWorker() + c.Assert(worker, NotNil) } func (s *testDDLSuite) TestSchemaError(c *C) { @@ -585,15 +450,17 @@ func (s *testDDLSuite) TestIgnorableSpec(c *C) { } func (s *testDDLSuite) TestBuildJobDependence(c *C) { - defer testleak.AfterTest(c)() store := testCreateStore(c, "test_set_job_relation") defer store.Close() - job1 := &model.Job{ID: 1, TableID: 1} - job2 := &model.Job{ID: 2, TableID: 1} - job3 := &model.Job{ID: 3, TableID: 2} - job6 := &model.Job{ID: 6, TableID: 1} - job7 := &model.Job{ID: 7, TableID: 2} + // Add some non-add-index jobs. + job1 := &model.Job{ID: 1, TableID: 1, Type: model.ActionAddColumn} + job2 := &model.Job{ID: 2, TableID: 1, Type: model.ActionCreateTable} + job3 := &model.Job{ID: 3, TableID: 2, Type: model.ActionDropColumn} + job6 := &model.Job{ID: 6, TableID: 1, Type: model.ActionDropTable} + job7 := &model.Job{ID: 7, TableID: 2, Type: model.ActionModifyColumn} + job9 := &model.Job{ID: 9, SchemaID: 111, Type: model.ActionDropSchema} + job11 := &model.Job{ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []interface{}{int64(111), "old db name"}} kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) err := t.EnQueueDDLJob(job1) @@ -606,9 +473,13 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { c.Assert(err, IsNil) err = t.EnQueueDDLJob(job7) c.Assert(err, IsNil) + err = t.EnQueueDDLJob(job9) + c.Assert(err, IsNil) + err = t.EnQueueDDLJob(job11) + c.Assert(err, IsNil) return nil }) - job4 := &model.Job{ID: 4, TableID: 1} + job4 := &model.Job{ID: 4, TableID: 1, Type: model.ActionAddIndex} kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) err := buildJobDependence(t, job4) @@ -616,7 +487,7 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { c.Assert(job4.DependencyID, Equals, int64(2)) return nil }) - job5 := &model.Job{ID: 5, TableID: 2} + job5 := &model.Job{ID: 5, TableID: 2, Type: model.ActionAddIndex} kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) err := buildJobDependence(t, job5) @@ -624,7 +495,7 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { c.Assert(job5.DependencyID, Equals, int64(3)) return nil }) - job8 := &model.Job{ID: 8, TableID: 3} + job8 := &model.Job{ID: 8, TableID: 3, Type: model.ActionAddIndex} kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) err := buildJobDependence(t, job8) @@ -632,4 +503,210 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) { c.Assert(job8.DependencyID, Equals, int64(0)) return nil }) + job10 := &model.Job{ID: 10, SchemaID: 111, TableID: 3, Type: model.ActionAddIndex} + kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + err := buildJobDependence(t, job10) + c.Assert(err, IsNil) + c.Assert(job10.DependencyID, Equals, int64(9)) + return nil + }) + job12 := &model.Job{ID: 12, SchemaID: 112, TableID: 2, Type: model.ActionAddIndex} + kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + err := buildJobDependence(t, job12) + c.Assert(err, IsNil) + c.Assert(job12.DependencyID, Equals, int64(11)) + return nil + }) +} + +func (s *testDDLSuite) TestParallelDDL(c *C) { + store := testCreateStore(c, "test_parallel_ddl") + defer store.Close() + d := testNewDDL(context.Background(), nil, store, nil, nil, testLease) + defer d.Stop() + ctx := testNewContext(d) + err := ctx.NewTxn() + c.Assert(err, IsNil) + + /* + build structure: + DBs -> { + db1: test_parallel_ddl_1 + db2: test_parallel_ddl_2 + } + Tables -> { + db1.t1 (c1 int, c2 int) + db1.t2 (c1 int primary key, c2 int, c3 int) + db2.t3 (c1 int, c2 int, c3 int, c4 int) + } + Data -> { + t1: (10, 10), (20, 20) + t2: (1, 1, 1), (2, 2, 2), (3, 3, 3) + t3: (11, 22, 33, 44) + } + */ + // create database test_parallel_ddl_1; + dbInfo1 := testSchemaInfo(c, d, "test_parallel_ddl_1") + testCreateSchema(c, ctx, d, dbInfo1) + // create table t1 (c1 int, c2 int); + tblInfo1 := testTableInfo(c, d, "t1", 2) + testCreateTable(c, ctx, d, dbInfo1, tblInfo1) + // insert t1 values (10, 10), (20, 20) + tbl1 := testGetTable(c, d, dbInfo1.ID, tblInfo1.ID) + _, err = tbl1.AddRecord(ctx, types.MakeDatums(1, 1), false) + c.Assert(err, IsNil) + _, err = tbl1.AddRecord(ctx, types.MakeDatums(2, 2), false) + c.Assert(err, IsNil) + // create table t2 (c1 int primary key, c2 int, c3 int); + tblInfo2 := testTableInfo(c, d, "t2", 3) + tblInfo2.Columns[0].Flag = mysql.PriKeyFlag | mysql.NotNullFlag + tblInfo2.PKIsHandle = true + testCreateTable(c, ctx, d, dbInfo1, tblInfo2) + // insert t2 values (1, 1), (2, 2), (3, 3) + tbl2 := testGetTable(c, d, dbInfo1.ID, tblInfo2.ID) + _, err = tbl2.AddRecord(ctx, types.MakeDatums(1, 1, 1), false) + c.Assert(err, IsNil) + _, err = tbl2.AddRecord(ctx, types.MakeDatums(2, 2, 2), false) + c.Assert(err, IsNil) + _, err = tbl2.AddRecord(ctx, types.MakeDatums(3, 3, 3), false) + c.Assert(err, IsNil) + // create database test_parallel_ddl_2; + dbInfo2 := testSchemaInfo(c, d, "test_parallel_ddl_2") + testCreateSchema(c, ctx, d, dbInfo2) + // create table t3 (c1 int, c2 int, c3 int, c4 int); + tblInfo3 := testTableInfo(c, d, "t3", 4) + testCreateTable(c, ctx, d, dbInfo2, tblInfo3) + // insert t3 values (11, 22, 33, 44) + tbl3 := testGetTable(c, d, dbInfo2.ID, tblInfo3.ID) + _, err = tbl3.AddRecord(ctx, types.MakeDatums(11, 22, 33, 44), false) + c.Assert(err, IsNil) + + // set hook to execute jobs after all jobs are in queue. + jobCnt := int64(11) + tc := &TestDDLCallback{} + once := sync.Once{} + var checkErr error + tc.onJobRunBefore = func(job *model.Job) { + // TODO: extract a unified function for other tests. + once.Do(func() { + qLen1 := int64(0) + qLen2 := int64(0) + for { + checkErr = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + m := meta.NewMeta(txn) + qLen1, err = m.DDLJobQueueLen() + if err != nil { + return err + } + qLen2, err = m.DDLJobQueueLen(meta.AddIndexJobListKey) + if err != nil { + return err + } + return nil + }) + if checkErr != nil { + break + } + if qLen1+qLen2 == jobCnt { + if qLen2 != 5 { + checkErr = errors.Errorf("add index jobs cnt %v != 5", qLen2) + } + break + } + time.Sleep(5 * time.Millisecond) + } + }) + } + d.SetHook(tc) + c.Assert(checkErr, IsNil) + + /* + prepare jobs: + / job no. / database no. / table no. / action type / + / 1 / 1 / 1 / add index / + / 2 / 1 / 1 / add column / + / 3 / 1 / 1 / add index / + / 4 / 1 / 2 / drop column / + / 5 / 1 / 1 / drop index / + / 6 / 1 / 2 / add index / + / 7 / 2 / 3 / drop column / + / 8 / 2 / 3 / rebase autoID/ + / 9 / 1 / 1 / add index / + / 10 / 2 / null / drop schema / + / 11 / 2 / 2 / add index / + */ + job1 := buildCreateIdxJob(dbInfo1, tblInfo1, false, "db1_idx1", "c1") + d.addDDLJob(ctx, job1) + job2 := buildCreateColumnJob(dbInfo1, tblInfo1, "c3", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, nil) + d.addDDLJob(ctx, job2) + job3 := buildCreateIdxJob(dbInfo1, tblInfo1, false, "db1_idx2", "c3") + d.addDDLJob(ctx, job3) + job4 := buildDropColumnJob(dbInfo1, tblInfo2, "c3") + d.addDDLJob(ctx, job4) + job5 := buildDropIdxJob(dbInfo1, tblInfo1, "db1_idx1") + d.addDDLJob(ctx, job5) + job6 := buildCreateIdxJob(dbInfo1, tblInfo2, false, "db2_idx1", "c2") + d.addDDLJob(ctx, job6) + job7 := buildDropColumnJob(dbInfo2, tblInfo3, "c4") + d.addDDLJob(ctx, job7) + job8 := buildRebaseAutoIDJobJob(dbInfo2, tblInfo3, 1024) + d.addDDLJob(ctx, job8) + job9 := buildCreateIdxJob(dbInfo1, tblInfo1, false, "db1_idx3", "c2") + d.addDDLJob(ctx, job9) + job10 := buildDropSchemaJob(dbInfo2) + d.addDDLJob(ctx, job10) + job11 := buildCreateIdxJob(dbInfo2, tblInfo3, false, "db3_idx1", "c2") + d.addDDLJob(ctx, job11) + // TODO: add rename table job + + // check results. + isChecked := false + for !isChecked { + kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + m := meta.NewMeta(txn) + lastJob, err := m.GetHistoryDDLJob(job11.ID) + c.Assert(err, IsNil) + // all jobs are finished. + if lastJob != nil { + finishedJobs, err := m.GetAllHistoryDDLJobs() + c.Assert(err, IsNil) + // get the last 11 jobs completed. + finishedJobs = finishedJobs[len(finishedJobs)-11:] + // check some jobs are ordered because of the dependence. + c.Assert(finishedJobs[0].ID, Equals, job1.ID) + c.Assert(finishedJobs[1].ID, Equals, job2.ID) + c.Assert(finishedJobs[2].ID, Equals, job3.ID) + c.Assert(finishedJobs[4].ID, Equals, job5.ID) + c.Assert(finishedJobs[10].ID, Equals, job11.ID) + // check the jobs are ordered in the adding-index-job queue or general-job queue. + addIdxJobID := int64(0) + generalJobID := int64(0) + for _, job := range finishedJobs { + // check jobs' order. + if job.Type == model.ActionAddIndex { + c.Assert(job.ID, Greater, addIdxJobID) + addIdxJobID = job.ID + } else { + c.Assert(job.ID, Greater, generalJobID) + generalJobID = job.ID + } + // check jobs' state. + if job.ID == lastJob.ID { + c.Assert(job.State, Equals, model.JobStateCancelled, Commentf("job: %v", job)) + } else { + c.Assert(job.State, Equals, model.JobStateSynced, Commentf("job: %v", job)) + } + } + + isChecked = true + } + return nil + }) + time.Sleep(10 * time.Millisecond) + } + + tc = &TestDDLCallback{} + d.SetHook(tc) } diff --git a/ddl/schema_test.go b/ddl/schema_test.go index 12dc81bd3d3fa..159f0d3cd95ad 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -69,15 +69,18 @@ func testCreateSchema(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo return job } -func testDropSchema(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo) (*model.Job, int64) { - job := &model.Job{ +func buildDropSchemaJob(dbInfo *model.DBInfo) *model.Job { + return &model.Job{ SchemaID: dbInfo.ID, Type: model.ActionDropSchema, BinlogInfo: &model.HistoryInfo{}, } +} + +func testDropSchema(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo) (*model.Job, int64) { + job := buildDropSchemaJob(dbInfo) err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) - ver := getSchemaVer(c, ctx) return job, ver } diff --git a/meta/meta.go b/meta/meta.go index 0c0b4309b8e8d..2c5ed10563c81 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -86,11 +86,19 @@ type Meta struct { } // NewMeta creates a Meta in transaction txn. -func NewMeta(txn kv.Transaction) *Meta { +// If the current Meta needs to handle a job, jobListKey is the type of the job's list. +func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta { txn.SetOption(kv.Priority, kv.PriorityHigh) txn.SetOption(kv.SyncLog, true) t := structure.NewStructure(txn, txn, mMetaPrefix) - return &Meta{txn: t, StartTS: txn.StartTS(), jobListKey: DefaultJobListKey} + listKey := DefaultJobListKey + if len(jobListKeys) != 0 { + listKey = jobListKeys[0] + } + return &Meta{txn: t, + StartTS: txn.StartTS(), + jobListKey: listKey, + } } // NewSnapshotMeta creates a Meta with snapshot. @@ -459,11 +467,6 @@ var ( AddIndexJobListKey JobListKeyType = mDDLJobAddIdxList ) -// SetJobListKey sets the job list key. -func (m *Meta) SetJobListKey(key []byte) { - m.jobListKey = key -} - func (m *Meta) enQueueDDLJob(key []byte, job *model.Job) error { b, err := job.Encode(true) if err != nil { @@ -505,9 +508,17 @@ func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) { } // GetDDLJob returns the DDL job with index. -func (m *Meta) GetDDLJob(index int64) (*model.Job, error) { +// The length of jobListKeys can only be 1 or 0. +// If its length is 1, we need to replace m.jobListKey with jobListKeys[0]. +// Otherwise, we use m.jobListKey directly. +func (m *Meta) GetDDLJob(index int64, jobListKeys ...JobListKeyType) (*model.Job, error) { + listKey := m.jobListKey + if len(jobListKeys) != 0 { + listKey = jobListKeys[0] + } + startTime := time.Now() - job, err := m.getDDLJob(m.jobListKey, index) + job, err := m.getDDLJob(listKey, index) metrics.MetaHistogram.WithLabelValues(metrics.GetDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) return job, errors.Trace(err) } @@ -524,21 +535,44 @@ func (m *Meta) updateDDLJob(index int64, job *model.Job, key []byte, updateRawAr // UpdateDDLJob updates the DDL job with index. // updateRawArgs is used to determine whether to update the raw args when encode the job. -func (m *Meta) UpdateDDLJob(index int64, job *model.Job, updateRawArgs bool) error { +// The length of jobListKeys can only be 1 or 0. +// If its length is 1, we need to replace m.jobListKey with jobListKeys[0]. +// Otherwise, we use m.jobListKey directly. +func (m *Meta) UpdateDDLJob(index int64, job *model.Job, updateRawArgs bool, jobListKeys ...JobListKeyType) error { + listKey := m.jobListKey + if len(jobListKeys) != 0 { + listKey = jobListKeys[0] + } + startTime := time.Now() - err := m.updateDDLJob(index, job, m.jobListKey, updateRawArgs) + err := m.updateDDLJob(index, job, listKey, updateRawArgs) metrics.MetaHistogram.WithLabelValues(metrics.UpdateDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) return errors.Trace(err) } // DDLJobQueueLen returns the DDL job queue length. -func (m *Meta) DDLJobQueueLen() (int64, error) { - return m.txn.LLen(m.jobListKey) +// The length of jobListKeys can only be 1 or 0. +// If its length is 1, we need to replace m.jobListKey with jobListKeys[0]. +// Otherwise, we use m.jobListKey directly. +func (m *Meta) DDLJobQueueLen(jobListKeys ...JobListKeyType) (int64, error) { + listKey := m.jobListKey + if len(jobListKeys) != 0 { + listKey = jobListKeys[0] + } + return m.txn.LLen(listKey) } -// GetAllDDLJobs gets all DDL Jobs. -func (m *Meta) GetAllDDLJobs() ([]*model.Job, error) { - values, err := m.txn.LGetAll(mDDLJobListKey) +// GetAllDDLJobsInQueue gets all DDL Jobs in the current queue. +// The length of jobListKeys can only be 1 or 0. +// If its length is 1, we need to replace m.jobListKey with jobListKeys[0]. +// Otherwise, we use m.jobListKey directly. +func (m *Meta) GetAllDDLJobsInQueue(jobListKeys ...JobListKeyType) ([]*model.Job, error) { + listKey := m.jobListKey + if len(jobListKeys) != 0 { + listKey = jobListKeys[0] + } + + values, err := m.txn.LGetAll(listKey) if err != nil || values == nil { return nil, errors.Trace(err) } diff --git a/meta/meta_test.go b/meta/meta_test.go index d514752dc082c..88ae48a2b90ff 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -311,12 +311,12 @@ func (s *testSuite) TestDDL(c *C) { lastID = job.ID } - // Test GetAllDDLJobs. + // Test GetAllDDLJobsInQueue. err = t.EnQueueDDLJob(job) job1 := &model.Job{ID: 2} err = t.EnQueueDDLJob(job1) c.Assert(err, IsNil) - jobs, err := t.GetAllDDLJobs() + jobs, err := t.GetAllDDLJobsInQueue() c.Assert(err, IsNil) expectJobs := []*model.Job{job, job1} c.Assert(jobs, DeepEquals, expectJobs) diff --git a/model/ddl.go b/model/ddl.go index 596606fa6b7b9..c3716a2fc4f50 100644 --- a/model/ddl.go +++ b/model/ddl.go @@ -52,6 +52,9 @@ const ( ActionDropTablePartition ActionType = 20 ) +// AddIndexStr is a string related to the operation of "add index". +const AddIndexStr = "add index" + var actionMap = map[ActionType]string{ ActionCreateSchema: "create schema", ActionDropSchema: "drop schema", @@ -59,7 +62,7 @@ var actionMap = map[ActionType]string{ ActionDropTable: "drop table", ActionAddColumn: "add column", ActionDropColumn: "drop column", - ActionAddIndex: "add index", + ActionAddIndex: AddIndexStr, ActionDropIndex: "drop index", ActionAddForeignKey: "add foreign key", ActionDropForeignKey: "drop foreign key", @@ -271,6 +274,7 @@ func (job *Job) IsDependentOn(other *Job) (bool, error) { return isDependent, errors.Trace(err) } + // TODO: If a job is ActionRenameTable, we need to check table name. if other.TableID == job.TableID { return true, nil } diff --git a/store/mockstore/mocktikv/cop_handler_dag.go b/store/mockstore/mocktikv/cop_handler_dag.go index 74c7848f79a11..ab6fa5ca8fc46 100644 --- a/store/mockstore/mocktikv/cop_handler_dag.go +++ b/store/mockstore/mocktikv/cop_handler_dag.go @@ -86,7 +86,7 @@ func (lm *locCache) getLoc(name string) (*time.Location, error) { } lm.RUnlock() - return nil, errors.New(fmt.Sprintf("invalid name for timezone %s", name)) + return nil, fmt.Errorf("invalid name for timezone %s", name) } type dagContext struct { diff --git a/util/admin/admin.go b/util/admin/admin.go index 5517b84f628ac..0b9c2bcc55f74 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -104,7 +104,11 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { errs[i] = errors.Trace(err) continue } - err = t.UpdateDDLJob(int64(j), job, true) + if job.Type == model.ActionAddIndex { + err = t.UpdateDDLJob(int64(j), job, true, meta.AddIndexJobListKey) + } else { + err = t.UpdateDDLJob(int64(j), job, true) + } if err != nil { errs[i] = errors.Trace(err) } @@ -116,17 +120,14 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { return errs, nil } -// GetDDLJobs returns the DDL jobs and an error. -func GetDDLJobs(txn kv.Transaction) ([]*model.Job, error) { - t := meta.NewMeta(txn) - cnt, err := t.DDLJobQueueLen() +func getDDLJobsInQueue(t *meta.Meta, jobListKey meta.JobListKeyType) ([]*model.Job, error) { + cnt, err := t.DDLJobQueueLen(jobListKey) if err != nil { return nil, errors.Trace(err) } - jobs := make([]*model.Job, cnt) for i := range jobs { - jobs[i], err = t.GetDDLJob(int64(i)) + jobs[i], err = t.GetDDLJob(int64(i), jobListKey) if err != nil { return nil, errors.Trace(err) } @@ -134,6 +135,21 @@ func GetDDLJobs(txn kv.Transaction) ([]*model.Job, error) { return jobs, nil } +// GetDDLJobs returns all DDL jobs. +// TODO: Sort jobs. +func GetDDLJobs(txn kv.Transaction) ([]*model.Job, error) { + t := meta.NewMeta(txn) + generalJobs, err := getDDLJobsInQueue(t, meta.DefaultJobListKey) + if err != nil { + return nil, errors.Trace(err) + } + addIdxJobs, err := getDDLJobsInQueue(t, meta.AddIndexJobListKey) + if err != nil { + return nil, errors.Trace(err) + } + return append(generalJobs, addIdxJobs...), nil +} + // MaxHistoryJobs is exported for testing. const MaxHistoryJobs = 10