Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support set relation job #6206

Merged
merged 5 commits into from
Apr 13, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ func (d *ddl) isOwner() bool {
return isOwner
}

// buildJobDependence sets the current job's dependence-ID that the current job depends on.
// The dependent job's ID must less than the current job's ID, and we need the largest one in the list.
func buildJobDependence(t *meta.Meta, curJob *model.Job) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments for what is a dependence job?

jobs, err := t.GetAllDDLJobs()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we run this check in a transaction? For example, two DDLs come in the same time. Will this function detect the dependence?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in a transaction. The t has a field of transaction.

if err != nil {
return errors.Trace(err)
}
for _, job := range jobs {
if curJob.ID < job.ID {
continue
}
isDependent, err := curJob.IsDependentOn(job)
if err != nil {
return errors.Trace(err)
}
if isDependent {
curJob.DependentID = job.ID
break
}
}
return nil
}

// addDDLJob gets a global job ID and puts the DDL job in the DDL queue.
func (d *ddl) addDDLJob(ctx sessionctx.Context, job *model.Job) error {
startTime := time.Now()
Expand Down
50 changes: 50 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,53 @@ func (s *testDDLSuite) TestIgnorableSpec(c *C) {
c.Assert(isIgnorableSpec(spec), IsTrue)
}
}

func (s *testDDLSuite) TestBuildJobDependence(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_set_job_relation")
defer store.Close()

job1 := &model.Job{ID: 1, TableID: 1}
job2 := &model.Job{ID: 2, TableID: 1}
job3 := &model.Job{ID: 3, TableID: 2}
job6 := &model.Job{ID: 6, TableID: 1}
job7 := &model.Job{ID: 7, TableID: 2}
kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := t.EnQueueDDLJob(job1)
c.Assert(err, IsNil)
err = t.EnQueueDDLJob(job2)
c.Assert(err, IsNil)
err = t.EnQueueDDLJob(job3)
c.Assert(err, IsNil)
err = t.EnQueueDDLJob(job6)
c.Assert(err, IsNil)
err = t.EnQueueDDLJob(job7)
c.Assert(err, IsNil)
return nil
})
job4 := &model.Job{ID: 4, TableID: 1}
kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := buildJobDependence(t, job4)
c.Assert(err, IsNil)
c.Assert(job4.DependentID, Equals, int64(2))
return nil
})
job5 := &model.Job{ID: 5, TableID: 2}
kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := buildJobDependence(t, job5)
c.Assert(err, IsNil)
c.Assert(job5.DependentID, Equals, int64(3))
return nil
})
job8 := &model.Job{ID: 8, TableID: 3}
kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := buildJobDependence(t, job8)
c.Assert(err, IsNil)
c.Assert(job8.DependentID, Equals, int64(0))
return nil
})
}
20 changes: 20 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,26 @@ func (m *Meta) DDLJobQueueLen() (int64, error) {
return m.txn.LLen(mDDLJobListKey)
}

// GetAllDDLJobs gets all DDL Jobs.
func (m *Meta) GetAllDDLJobs() ([]*model.Job, error) {
values, err := m.txn.LGetAll(mDDLJobListKey)
if err != nil || values == nil {
return nil, errors.Trace(err)
}

jobs := make([]*model.Job, 0, len(values))
for _, val := range values {
job := &model.Job{}
err = job.Decode(val)
if err != nil {
return nil, errors.Trace(err)
}
jobs = append(jobs, job)
}

return jobs, nil
}

func (m *Meta) jobIDKey(id int64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(id))
Expand Down
10 changes: 10 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,16 @@ func (s *testSuite) TestDDL(c *C) {
lastID = job.ID
}

// Test GetAllDDLJobs.
err = t.EnQueueDDLJob(job)
job1 := &model.Job{ID: 2}
err = t.EnQueueDDLJob(job1)
c.Assert(err, IsNil)
jobs, err := t.GetAllDDLJobs()
c.Assert(err, IsNil)
expectJobs := []*model.Job{job, job1}
c.Assert(jobs, DeepEquals, expectJobs)

err = txn.Commit(context.Background())
c.Assert(err, IsNil)
}
37 changes: 37 additions & 0 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ type Job struct {
// StartTS uses timestamp allocated by TSO.
// Now it's the TS when we put the job to TiKV queue.
StartTS uint64 `json:"start_ts"`
// DependentID is the job's ID that the current job depends on.
DependentID int64 `json:"related_id"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/DependentID/DependencyID
s/related_id/dependency_id

// Query string of the ddl job.
Query string `json:"query"`
BinlogInfo *HistoryInfo `json:"binlog"`
Expand Down Expand Up @@ -213,6 +215,41 @@ func (job *Job) String() string {
job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), tsConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer)
}

func (job *Job) hasDependentSchema(other *Job) (bool, error) {
if other.Type == ActionDropSchema || other.Type == ActionCreateSchema {
if other.SchemaID == job.SchemaID {
return true, nil
}
if job.Type == ActionRenameTable {
var oldSchemaID int64
if err := job.DecodeArgs(&oldSchemaID); err != nil {
return false, errors.Trace(err)
}
if other.SchemaID == oldSchemaID {
return true, nil
}
}
}
return false, nil
}

// IsDependentOn returns whether job depends on other.
func (job *Job) IsDependentOn(other *Job) (bool, error) {
isDependent, err := job.hasDependentSchema(other)
if err != nil || isDependent {
return isDependent, errors.Trace(err)
}
isDependent, err = other.hasDependentSchema(job)
if err != nil || isDependent {
return isDependent, errors.Trace(err)
}

if other.TableID == job.TableID {
return true, nil
}
return false, nil
}

// IsFinished returns whether job is finished or not.
// If the job state is Done or Cancelled, it is finished.
func (job *Job) IsFinished() bool {
Expand Down
33 changes: 33 additions & 0 deletions model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package model

import (
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -142,12 +143,44 @@ func (*testModelSuite) TestJobCodec(c *C) {
}
job := &Job{
ID: 1,
TableID: 2,
SchemaID: 1,
BinlogInfo: &HistoryInfo{},
Args: []interface{}{NewCIStr("a"), A{Name: "abc"}},
}
job.BinlogInfo.AddDBInfo(123, &DBInfo{ID: 1, Name: NewCIStr("test_history_db")})
job.BinlogInfo.AddTableInfo(123, &TableInfo{ID: 1, Name: NewCIStr("test_history_tbl")})

// Test IsDependentOn.
// job: table ID is 2
// job1: table ID is 2
var err error
job1 := &Job{
ID: 2,
TableID: 2,
SchemaID: 1,
Type: ActionRenameTable,
BinlogInfo: &HistoryInfo{},
Args: []interface{}{int64(3), NewCIStr("new_table_name")},
}
job1.RawArgs, err = json.Marshal(job1.Args)
c.Assert(err, IsNil)
isDependent, err := job.IsDependentOn(job1)
c.Assert(err, IsNil)
c.Assert(isDependent, IsTrue)
// job1: rename table, old schema ID is 3
// job2: create schema, schema ID is 3
job2 := &Job{
ID: 3,
TableID: 3,
SchemaID: 3,
Type: ActionCreateSchema,
BinlogInfo: &HistoryInfo{},
}
isDependent, err = job2.IsDependentOn(job1)
c.Assert(err, IsNil)
c.Assert(isDependent, IsTrue)

c.Assert(job.IsCancelled(), Equals, false)
b, err := job.Encode(false)
c.Assert(err, IsNil)
Expand Down
20 changes: 20 additions & 0 deletions structure/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@ func (t *TxStructure) LLen(key []byte) (int64, error) {
return meta.RIndex - meta.LIndex, errors.Trace(err)
}

// LGetAll gets all elements of this list in order from right to left.
func (t *TxStructure) LGetAll(key []byte) ([][]byte, error) {
metaKey := t.encodeListMetaKey(key)
meta, err := t.loadListMeta(metaKey)
if err != nil || meta.IsEmpty() {
return nil, errors.Trace(err)
}

length := int(meta.RIndex - meta.LIndex)
elements := make([][]byte, 0, length)
for index := meta.RIndex - 1; index >= meta.LIndex; index-- {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be sure it's ordered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use RPush to enqueue, so we know this order.

e, err := t.reader.Get(t.encodeListDataKey(key, index))
if err != nil {
return nil, errors.Trace(err)
}
elements = append(elements, e)
}
return elements, nil
}

// LIndex gets an element from a list by its index.
func (t *TxStructure) LIndex(key []byte, index int64) ([]byte, error) {
metaKey := t.encodeListMetaKey(key)
Expand Down
12 changes: 11 additions & 1 deletion structure/structure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,21 @@ func (s *testTxStructureSuite) TestList(c *C) {
err = tx.LPush(key, []byte("3"), []byte("2"), []byte("1"))
c.Assert(err, IsNil)

// Test LGetAll.
err = tx.LPush(key, []byte("11"))
c.Assert(err, IsNil)
values, err := tx.LGetAll(key)
c.Assert(err, IsNil)
c.Assert(values, DeepEquals, [][]byte{[]byte("3"), []byte("2"), []byte("1"), []byte("11")})
value, err := tx.LPop(key)
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, []byte("11"))

l, err := tx.LLen(key)
c.Assert(err, IsNil)
c.Assert(l, Equals, int64(3))

value, err := tx.LIndex(key, 1)
value, err = tx.LIndex(key, 1)
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, []byte("2"))

Expand Down