Skip to content

Commit

Permalink
ddl, meta: clean the jobs in adding index queue (#6161)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Apr 4, 2018
1 parent 6287cfb commit cb04e97
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 15 deletions.
1 change: 0 additions & 1 deletion ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func checkHistoryJob(c *C, job *model.Job) {
}

func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJobArgs) {
c.Assert(ctx.NewTxn(), IsNil)
t := meta.NewMeta(ctx.Txn())
historyJob, err := t.GetHistoryDDLJob(id)
c.Assert(err, IsNil)
Expand Down
81 changes: 79 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func (d *ddl) onDDLWorker() {
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
}
}()

// shouldCleanJobs is used to determine whether to clean up the job in adding index queue.
shouldCleanJobs := true
for {
select {
case <-ticker.C:
Expand All @@ -61,9 +64,12 @@ func (d *ddl) onDDLWorker() {
return
}

err := d.handleDDLJobQueue()
err := d.handleDDLJobQueue(shouldCleanJobs)
if err != nil {
log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err))
} else if shouldCleanJobs {
log.Info("[ddl] cleaning jobs in the adding index queue finished.")
shouldCleanJobs = false
}
}
}
Expand Down Expand Up @@ -204,7 +210,9 @@ func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
return job, errors.Trace(err)
}

func (d *ddl) handleDDLJobQueue() error {
// handleDDLJobQueue handles DDL jobs in DDL Job queue.
// shouldCleanJobs is used to determine whether to clean up the job in adding index queue.
func (d *ddl) handleDDLJobQueue(shouldCleanJobs bool) error {
once := true
for {
if d.isClosed() {
Expand All @@ -220,6 +228,12 @@ func (d *ddl) handleDDLJobQueue() error {
return nil
}

// It's used for clean up the job in adding index queue before we support adding index queue.
// TODO: Remove this logic after we support the adding index queue.
if shouldCleanJobs {
return errors.Trace(d.cleanAddIndexQueueJobs(txn))
}

var err error
t := meta.NewMeta(txn)
// We become the owner. Get the first job and run it.
Expand Down Expand Up @@ -479,3 +493,66 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
err = t.SetSchemaDiff(diff)
return schemaVersion, errors.Trace(err)
}

// cleanAddIndexQueueJobs cleans jobs in adding index queue.
// It's only done once after the worker become the owner.
// TODO: Remove this logic after we support the adding index queue.
func (d *ddl) cleanAddIndexQueueJobs(txn kv.Transaction) error {
startTime := time.Now()
m := meta.NewMeta(txn)
m.SetJobListKey(meta.AddIndexJobListKey)
for {
job, err := d.getFirstDDLJob(m)
if err != nil {
return errors.Trace(err)
}
if job == nil {
log.Infof("[ddl] cleaning jobs in the adding index queue takes time %v.", time.Since(startTime))
return nil
}
log.Infof("[ddl] cleaning job %v in the adding index queue.", job)

// The types of these jobs must be ActionAddIndex.
if job.SchemaState == model.StatePublic || job.SchemaState == model.StateNone {
if job.SchemaState == model.StateNone {
job.State = model.JobStateCancelled
} else {
binloginfo.SetDDLBinlog(d.workerVars.BinlogClient, txn, job.ID, job.Query)
job.State = model.JobStateSynced
}
err = d.finishDDLJob(m, job)
if err != nil {
return errors.Trace(err)
}
continue
}

// When the job not in "none" and "public" state, we need to rollback it.
schemaID := job.SchemaID
tblInfo, err := getTableInfo(m, job, schemaID)
if err != nil {
return errors.Trace(err)
}
var indexName model.CIStr
var unique bool
err = job.DecodeArgs(&unique, &indexName)
if err != nil {
return errors.Trace(err)
}
indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
_, err = d.convert2RollbackJob(m, job, tblInfo, indexInfo, nil)
if err == nil {
_, err = m.DeQueueDDLJob()
}
if err != nil {
return errors.Trace(err)
}
// Put the job to the default job list.
m.SetJobListKey(meta.DefaultJobListKey)
err = m.EnQueueDDLJob(job)
m.SetJobListKey(meta.AddIndexJobListKey)
if err != nil {
return errors.Trace(err)
}
}
}
117 changes: 117 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,123 @@ func (s *testDDLSuite) TestRunWorker(c *C) {
<-exitCh
}

func (s *testDDLSuite) TestCleanJobs(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_clean_jobs")
defer store.Close()
d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)

ctx := testNewContext(d)
dbInfo := testSchemaInfo(c, d, "test")
testCreateSchema(c, ctx, d, dbInfo)
tblInfo := testTableInfo(c, d, "t", 2)
testCreateTable(c, ctx, d, dbInfo, tblInfo)

var failedJobIDs []int64
job := &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddIndex,
BinlogInfo: &model.HistoryInfo{},
}
idxColNames := []*ast.IndexColName{{
Column: &ast.ColumnName{Name: model.NewCIStr("c1")},
Length: types.UnspecifiedLength}}
// Add some adding index jobs to AddIndexJobList.
backfillAddIndexJob := func(jobArgs []interface{}) {
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
var err error
t := meta.NewMeta(txn)
t.SetJobListKey(meta.AddIndexJobListKey)
job.ID, err = t.GenGlobalID()
c.Assert(err, IsNil)
failedJobIDs = append(failedJobIDs, job.ID)
job.Args = jobArgs
err = t.EnQueueDDLJob(job)
c.Assert(err, IsNil)
return nil
})
}

// Add a StateNone job.
indexName := model.NewCIStr("idx_none")
args := []interface{}{false, indexName, idxColNames, nil}
backfillAddIndexJob(args)
// Add a StateDeleteOnly job.
indexName = model.NewCIStr("idx_delete_only")
args = []interface{}{false, indexName, idxColNames, nil}
backfillAddIndexJob(args)
changeJobState := func() {
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
t.SetJobListKey(meta.AddIndexJobListKey)
lastJobID := int64(len(failedJobIDs) - 1)
job, err1 := t.GetDDLJob(lastJobID)
c.Assert(err1, IsNil)
_, err1 = d.runDDLJob(t, job)
c.Assert(err1, IsNil)
_, err1 = updateSchemaVersion(t, job)
c.Assert(err1, IsNil)
err1 = t.UpdateDDLJob(lastJobID, job, true)
c.Assert(err1, IsNil)
return nil
})
err := d.callHookOnChanged(nil)
c.Assert(err, IsNil)
}
changeJobState()
// Add a StateWriteReorganization job.
indexName = model.NewCIStr("idx_write_reorg")
args = []interface{}{false, indexName, idxColNames, nil}
backfillAddIndexJob(args)
changeJobState() // convert to delete only
changeJobState() // convert to write only
changeJobState() // convert to write reorg
writeReorgTbl, err := getCurrentTable(d, dbInfo.ID, tblInfo.ID)
c.Assert(err, IsNil)

err = d.Stop()
c.Assert(err, IsNil)
// Make sure shouldCleanJobs is ture.
d = testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
testCreateIndex(c, ctx, d, dbInfo, writeReorgTbl.Meta(), false, "idx_normal", "c2")

// Make sure all DDL jobs are done.
for {
var isAllJobDone bool
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
len, err := t.DDLJobQueueLen()
c.Assert(err, IsNil)
if len == 0 {
isAllJobDone = true
}
return nil
})
if isAllJobDone {
break
}
time.Sleep(time.Millisecond)
}

// Check that the jobs in add index list are finished.
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
for i, id := range failedJobIDs {
historyJob, err := t.GetHistoryDDLJob(id)
c.Assert(err, IsNil)
c.Assert(historyJob, NotNil)
if i == 0 {
c.Assert(historyJob.State, Equals, model.JobStateCancelled)
} else {
c.Assert(historyJob.State, Equals, model.JobStateRollbackDone)
}
}
return nil
})
}

func (s *testDDLSuite) TestSchemaError(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_schema_error")
Expand Down
5 changes: 2 additions & 3 deletions ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ LOOP:
case <-ticker.C:
d.Stop()
d.start(context.Background())
time.Sleep(time.Millisecond * 20)
case err := <-done:
c.Assert(err, IsNil)
break LOOP
Expand All @@ -239,22 +240,20 @@ func (s *testSchemaSuite) TestSchemaResume(c *C) {
testCheckOwner(c, d1, true)

dbInfo := testSchemaInfo(c, d1, "test")

job := &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionCreateSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{dbInfo},
}

testRunInterruptedJob(c, d1, job)
testCheckSchemaState(c, d1, dbInfo, model.StatePublic)

job = &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
}

testRunInterruptedJob(c, d1, job)
testCheckSchemaState(c, d1, dbInfo, model.StateNone)
}
1 change: 1 addition & 0 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ LOOP:
d.close()
c.Assert(s.getDDLSchemaVer(c, d), GreaterEqual, ver)
d.start(context.Background())
time.Sleep(time.Millisecond * 20)
case err := <-done:
c.Assert(err, IsNil)
// TODO: Get this information from etcd.
Expand Down
34 changes: 25 additions & 9 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ var (

// Meta is for handling meta information in a transaction.
type Meta struct {
txn *structure.TxStructure
StartTS uint64 // StartTS is the txn's start TS.
txn *structure.TxStructure
StartTS uint64 // StartTS is the txn's start TS.
jobListKey JobListKeyType
}

// NewMeta creates a Meta in transaction txn.
func NewMeta(txn kv.Transaction) *Meta {
txn.SetOption(kv.Priority, kv.PriorityHigh)
txn.SetOption(kv.SyncLog, true)
t := structure.NewStructure(txn, txn, mMetaPrefix)
return &Meta{txn: t, StartTS: txn.StartTS()}
return &Meta{txn: t, StartTS: txn.StartTS(), jobListKey: DefaultJobListKey}
}

// NewSnapshotMeta creates a Meta with snapshot.
Expand Down Expand Up @@ -434,7 +435,6 @@ func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
}

// DDL job structure
// DDLOnwer: []byte
// DDLJobList: list jobs
// DDLJobHistory: hash
// DDLJobReorg: hash
Expand All @@ -444,10 +444,26 @@ func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {

var (
mDDLJobListKey = []byte("DDLJobList")
mDDLJobAddIdxList = []byte("DDLJobAddIdxList")
mDDLJobHistoryKey = []byte("DDLJobHistory")
mDDLJobReorgKey = []byte("DDLJobReorg")
)

// JobListKeyType is a key type of the DDL job queue.
type JobListKeyType []byte

var (
// DefaultJobListKey keeps all actions of DDL jobs.
DefaultJobListKey JobListKeyType = mDDLJobListKey
// AddIndexJobListKey only keeps the action of adding index.
AddIndexJobListKey JobListKeyType = mDDLJobAddIdxList
)

// SetJobListKey sets the job list key.
func (m *Meta) SetJobListKey(key []byte) {
m.jobListKey = key
}

func (m *Meta) enQueueDDLJob(key []byte, job *model.Job) error {
b, err := job.Encode(true)
if err != nil {
Expand All @@ -458,7 +474,7 @@ func (m *Meta) enQueueDDLJob(key []byte, job *model.Job) error {

// EnQueueDDLJob adds a DDL job to the list.
func (m *Meta) EnQueueDDLJob(job *model.Job) error {
return m.enQueueDDLJob(mDDLJobListKey, job)
return m.enQueueDDLJob(m.jobListKey, job)
}

func (m *Meta) deQueueDDLJob(key []byte) (*model.Job, error) {
Expand All @@ -474,7 +490,7 @@ func (m *Meta) deQueueDDLJob(key []byte) (*model.Job, error) {

// DeQueueDDLJob pops a DDL job from the list.
func (m *Meta) DeQueueDDLJob() (*model.Job, error) {
return m.deQueueDDLJob(mDDLJobListKey)
return m.deQueueDDLJob(m.jobListKey)
}

func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) {
Expand All @@ -491,7 +507,7 @@ func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) {
// GetDDLJob returns the DDL job with index.
func (m *Meta) GetDDLJob(index int64) (*model.Job, error) {
startTime := time.Now()
job, err := m.getDDLJob(mDDLJobListKey, index)
job, err := m.getDDLJob(m.jobListKey, index)
metrics.MetaHistogram.WithLabelValues(metrics.GetDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return job, errors.Trace(err)
}
Expand All @@ -510,14 +526,14 @@ func (m *Meta) updateDDLJob(index int64, job *model.Job, key []byte, updateRawAr
// updateRawArgs is used to determine whether to update the raw args when encode the job.
func (m *Meta) UpdateDDLJob(index int64, job *model.Job, updateRawArgs bool) error {
startTime := time.Now()
err := m.updateDDLJob(index, job, mDDLJobListKey, updateRawArgs)
err := m.updateDDLJob(index, job, m.jobListKey, updateRawArgs)
metrics.MetaHistogram.WithLabelValues(metrics.UpdateDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}

// DDLJobQueueLen returns the DDL job queue length.
func (m *Meta) DDLJobQueueLen() (int64, error) {
return m.txn.LLen(mDDLJobListKey)
return m.txn.LLen(m.jobListKey)
}

func (m *Meta) jobIDKey(id int64) []byte {
Expand Down

0 comments on commit cb04e97

Please sign in to comment.