Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, meta: clean the jobs in adding index queue #6161

Merged
merged 10 commits into from
Apr 4, 2018
1 change: 0 additions & 1 deletion ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func checkHistoryJob(c *C, job *model.Job) {
}

func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJobArgs) {
c.Assert(ctx.NewTxn(), IsNil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make TestCleanJobs stable.
I want to get schema version and check history job's schema version in a transaction.

t := meta.NewMeta(ctx.Txn())
historyJob, err := t.GetHistoryDDLJob(id)
c.Assert(err, IsNil)
Expand Down
81 changes: 79 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func (d *ddl) onDDLWorker() {
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
}
}()

// shouldCleanJobs is used to determine whether to clean up the job in adding index queue.
shouldCleanJobs := true
for {
select {
case <-ticker.C:
Expand All @@ -61,9 +64,12 @@ func (d *ddl) onDDLWorker() {
return
}

err := d.handleDDLJobQueue()
err := d.handleDDLJobQueue(shouldCleanJobs)
if err != nil {
log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err))
} else if shouldCleanJobs {
log.Info("[ddl] cleaning jobs in the adding index queue finished.")
shouldCleanJobs = false
}
}
}
Expand Down Expand Up @@ -204,7 +210,9 @@ func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
return job, errors.Trace(err)
}

func (d *ddl) handleDDLJobQueue() error {
// handleDDLJobQueue handles DDL jobs in DDL Job queue.
// shouldCleanJobs is used to determine whether to clean up the job in adding index queue.
func (d *ddl) handleDDLJobQueue(shouldCleanJobs bool) error {
once := true
for {
if d.isClosed() {
Expand All @@ -220,6 +228,12 @@ func (d *ddl) handleDDLJobQueue() error {
return nil
}

// It's used for clean up the job in adding index queue before we support adding index queue.
// TODO: Remove this logic after we support the adding index queue.
if shouldCleanJobs {
return errors.Trace(d.cleanAddIndexQueueJobs(txn))
}

var err error
t := meta.NewMeta(txn)
// We become the owner. Get the first job and run it.
Expand Down Expand Up @@ -479,3 +493,66 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
err = t.SetSchemaDiff(diff)
return schemaVersion, errors.Trace(err)
}

// cleanAddIndexQueueJobs cleans jobs in adding index queue.
// It's only done once after the worker become the owner.
// TODO: Remove this logic after we support the adding index queue.
func (d *ddl) cleanAddIndexQueueJobs(txn kv.Transaction) error {
startTime := time.Now()
m := meta.NewMeta(txn)
m.SetJobListKey(meta.AddIndexJobListKey)
for {
job, err := d.getFirstDDLJob(m)
if err != nil {
return errors.Trace(err)
}
if job == nil {
log.Infof("[ddl] cleaning jobs in the adding index queue takes time %v.", time.Since(startTime))
return nil
}
log.Infof("[ddl] cleaning job %v in the adding index queue.", job)

// The types of these jobs must be ActionAddIndex.
if job.SchemaState == model.StatePublic || job.SchemaState == model.StateNone {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to rollback or cancel the add index ddl job? What about enqueueing these jobs to default job list

Copy link
Contributor Author

@zimulala zimulala Apr 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@winkyao @coocood
If we put it in the default jobs list, we need to consider the order. And I hope this PR will change the original logic as little as possible.

Copy link
Member

@coocood coocood Apr 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just put all the add index jobs at the end of the default list, what the worst-case result would be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coocood
Added an index that should not be added.
e.g.
original order: drop column col1 -> add index idx1(col1) should be failed -> add column col1
current order: drop column col1 -> add column col1 -> add index idx1(col1) successful

if job.SchemaState == model.StateNone {
job.State = model.JobStateCancelled
} else {
binloginfo.SetDDLBinlog(d.workerVars.BinlogClient, txn, job.ID, job.Query)
job.State = model.JobStateSynced
}
err = d.finishDDLJob(m, job)
if err != nil {
return errors.Trace(err)
}
continue
}

// When the job not in "none" and "public" state, we need to rollback it.
schemaID := job.SchemaID
tblInfo, err := getTableInfo(m, job, schemaID)
if err != nil {
return errors.Trace(err)
}
var indexName model.CIStr
var unique bool
err = job.DecodeArgs(&unique, &indexName)
if err != nil {
return errors.Trace(err)
}
indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
_, err = d.convert2RollbackJob(m, job, tblInfo, indexInfo, nil)
if err == nil {
_, err = m.DeQueueDDLJob()
}
if err != nil {
return errors.Trace(err)
}
// Put the job to the default job list.
m.SetJobListKey(meta.DefaultJobListKey)
err = m.EnQueueDDLJob(job)
m.SetJobListKey(meta.AddIndexJobListKey)
if err != nil {
return errors.Trace(err)
}
}
}
117 changes: 117 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,123 @@ func (s *testDDLSuite) TestRunWorker(c *C) {
<-exitCh
}

func (s *testDDLSuite) TestCleanJobs(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_clean_jobs")
defer store.Close()
d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)

ctx := testNewContext(d)
dbInfo := testSchemaInfo(c, d, "test")
testCreateSchema(c, ctx, d, dbInfo)
tblInfo := testTableInfo(c, d, "t", 2)
testCreateTable(c, ctx, d, dbInfo, tblInfo)

var failedJobIDs []int64
job := &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddIndex,
BinlogInfo: &model.HistoryInfo{},
}
idxColNames := []*ast.IndexColName{{
Column: &ast.ColumnName{Name: model.NewCIStr("c1")},
Length: types.UnspecifiedLength}}
// Add some adding index jobs to AddIndexJobList.
backfillAddIndexJob := func(jobArgs []interface{}) {
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
var err error
t := meta.NewMeta(txn)
t.SetJobListKey(meta.AddIndexJobListKey)
job.ID, err = t.GenGlobalID()
c.Assert(err, IsNil)
failedJobIDs = append(failedJobIDs, job.ID)
job.Args = jobArgs
err = t.EnQueueDDLJob(job)
c.Assert(err, IsNil)
return nil
})
}

// Add a StateNone job.
indexName := model.NewCIStr("idx_none")
args := []interface{}{false, indexName, idxColNames, nil}
backfillAddIndexJob(args)
// Add a StateDeleteOnly job.
indexName = model.NewCIStr("idx_delete_only")
args = []interface{}{false, indexName, idxColNames, nil}
backfillAddIndexJob(args)
changeJobState := func() {
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
t.SetJobListKey(meta.AddIndexJobListKey)
lastJobID := int64(len(failedJobIDs) - 1)
job, err1 := t.GetDDLJob(lastJobID)
c.Assert(err1, IsNil)
_, err1 = d.runDDLJob(t, job)
c.Assert(err1, IsNil)
_, err1 = updateSchemaVersion(t, job)
c.Assert(err1, IsNil)
err1 = t.UpdateDDLJob(lastJobID, job, true)
c.Assert(err1, IsNil)
return nil
})
err := d.callHookOnChanged(nil)
c.Assert(err, IsNil)
}
changeJobState()
// Add a StateWriteReorganization job.
indexName = model.NewCIStr("idx_write_reorg")
args = []interface{}{false, indexName, idxColNames, nil}
backfillAddIndexJob(args)
changeJobState() // convert to delete only
changeJobState() // convert to write only
changeJobState() // convert to write reorg
writeReorgTbl, err := getCurrentTable(d, dbInfo.ID, tblInfo.ID)
c.Assert(err, IsNil)

err = d.Stop()
c.Assert(err, IsNil)
// Make sure shouldCleanJobs is ture.
d = testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
testCreateIndex(c, ctx, d, dbInfo, writeReorgTbl.Meta(), false, "idx_normal", "c2")

// Make sure all DDL jobs are done.
for {
var isAllJobDone bool
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
len, err := t.DDLJobQueueLen()
c.Assert(err, IsNil)
if len == 0 {
isAllJobDone = true
}
return nil
})
if isAllJobDone {
break
}
time.Sleep(time.Millisecond)
}

// Check that the jobs in add index list are finished.
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
for i, id := range failedJobIDs {
historyJob, err := t.GetHistoryDDLJob(id)
c.Assert(err, IsNil)
c.Assert(historyJob, NotNil)
if i == 0 {
c.Assert(historyJob.State, Equals, model.JobStateCancelled)
} else {
c.Assert(historyJob.State, Equals, model.JobStateRollbackDone)
}
}
return nil
})
}

func (s *testDDLSuite) TestSchemaError(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_schema_error")
Expand Down
5 changes: 2 additions & 3 deletions ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ LOOP:
case <-ticker.C:
d.Stop()
d.start(context.Background())
time.Sleep(time.Millisecond * 20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add sleep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we add check add index list, it makes processing the first job a bit slower.

case err := <-done:
c.Assert(err, IsNil)
break LOOP
Expand All @@ -239,22 +240,20 @@ func (s *testSchemaSuite) TestSchemaResume(c *C) {
testCheckOwner(c, d1, true)

dbInfo := testSchemaInfo(c, d1, "test")

job := &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionCreateSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{dbInfo},
}

testRunInterruptedJob(c, d1, job)
testCheckSchemaState(c, d1, dbInfo, model.StatePublic)

job = &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
}

testRunInterruptedJob(c, d1, job)
testCheckSchemaState(c, d1, dbInfo, model.StateNone)
}
1 change: 1 addition & 0 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ LOOP:
d.close()
c.Assert(s.getDDLSchemaVer(c, d), GreaterEqual, ver)
d.start(context.Background())
time.Sleep(time.Millisecond * 20)
case err := <-done:
c.Assert(err, IsNil)
// TODO: Get this information from etcd.
Expand Down
34 changes: 25 additions & 9 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ var (

// Meta is for handling meta information in a transaction.
type Meta struct {
txn *structure.TxStructure
StartTS uint64 // StartTS is the txn's start TS.
txn *structure.TxStructure
StartTS uint64 // StartTS is the txn's start TS.
jobListKey JobListKeyType
}

// NewMeta creates a Meta in transaction txn.
func NewMeta(txn kv.Transaction) *Meta {
txn.SetOption(kv.Priority, kv.PriorityHigh)
txn.SetOption(kv.SyncLog, true)
t := structure.NewStructure(txn, txn, mMetaPrefix)
return &Meta{txn: t, StartTS: txn.StartTS()}
return &Meta{txn: t, StartTS: txn.StartTS(), jobListKey: DefaultJobListKey}
}

// NewSnapshotMeta creates a Meta with snapshot.
Expand Down Expand Up @@ -434,7 +435,6 @@ func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
}

// DDL job structure
// DDLOnwer: []byte
// DDLJobList: list jobs
// DDLJobHistory: hash
// DDLJobReorg: hash
Expand All @@ -444,10 +444,26 @@ func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {

var (
mDDLJobListKey = []byte("DDLJobList")
mDDLJobAddIdxList = []byte("DDLJobAddIdxList")
mDDLJobHistoryKey = []byte("DDLJobHistory")
mDDLJobReorgKey = []byte("DDLJobReorg")
)

// JobListKeyType is a key type of the DDL job queue.
type JobListKeyType []byte

var (
// DefaultJobListKey keeps all actions of DDL jobs.
DefaultJobListKey JobListKeyType = mDDLJobListKey
// AddIndexJobListKey only keeps the action of adding index.
AddIndexJobListKey JobListKeyType = mDDLJobAddIdxList
)

// SetJobListKey sets the job list key.
func (m *Meta) SetJobListKey(key []byte) {
m.jobListKey = key
}

func (m *Meta) enQueueDDLJob(key []byte, job *model.Job) error {
b, err := job.Encode(true)
if err != nil {
Expand All @@ -458,7 +474,7 @@ func (m *Meta) enQueueDDLJob(key []byte, job *model.Job) error {

// EnQueueDDLJob adds a DDL job to the list.
func (m *Meta) EnQueueDDLJob(job *model.Job) error {
return m.enQueueDDLJob(mDDLJobListKey, job)
return m.enQueueDDLJob(m.jobListKey, job)
}

func (m *Meta) deQueueDDLJob(key []byte) (*model.Job, error) {
Expand All @@ -474,7 +490,7 @@ func (m *Meta) deQueueDDLJob(key []byte) (*model.Job, error) {

// DeQueueDDLJob pops a DDL job from the list.
func (m *Meta) DeQueueDDLJob() (*model.Job, error) {
return m.deQueueDDLJob(mDDLJobListKey)
return m.deQueueDDLJob(m.jobListKey)
}

func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) {
Expand All @@ -491,7 +507,7 @@ func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) {
// GetDDLJob returns the DDL job with index.
func (m *Meta) GetDDLJob(index int64) (*model.Job, error) {
startTime := time.Now()
job, err := m.getDDLJob(mDDLJobListKey, index)
job, err := m.getDDLJob(m.jobListKey, index)
metrics.MetaHistogram.WithLabelValues(metrics.GetDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return job, errors.Trace(err)
}
Expand All @@ -510,14 +526,14 @@ func (m *Meta) updateDDLJob(index int64, job *model.Job, key []byte, updateRawAr
// updateRawArgs is used to determine whether to update the raw args when encode the job.
func (m *Meta) UpdateDDLJob(index int64, job *model.Job, updateRawArgs bool) error {
startTime := time.Now()
err := m.updateDDLJob(index, job, mDDLJobListKey, updateRawArgs)
err := m.updateDDLJob(index, job, m.jobListKey, updateRawArgs)
metrics.MetaHistogram.WithLabelValues(metrics.UpdateDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}

// DDLJobQueueLen returns the DDL job queue length.
func (m *Meta) DDLJobQueueLen() (int64, error) {
return m.txn.LLen(mDDLJobListKey)
return m.txn.LLen(m.jobListKey)
}

func (m *Meta) jobIDKey(id int64) []byte {
Expand Down