Skip to content

Commit

Permalink
ddl: update the column's offset when we do modify column (#6274)
Browse files Browse the repository at this point in the history
* ddl: fix modify column bug
  • Loading branch information
zimulala authored Apr 12, 2018
1 parent ca627fe commit 28c21c9
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 10 deletions.
13 changes: 9 additions & 4 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ import (
type TestDDLCallback struct {
*BaseCallback

onJobRunBefore func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported func(*model.Job)
onWatched func(ctx context.Context)
onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported func(*model.Job)
onWatched func(ctx context.Context)
}

func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}
if tc.onJobRunBefore != nil {
tc.onJobRunBefore(job)
return
Expand Down
11 changes: 7 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (d *ddl) onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error)
}

// doModifyColumn updates the column information and reorders all columns.
func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition) (ver int64, _ error) {
func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition) (ver int64, _ error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -452,6 +452,9 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo
return ver, infoschema.ErrColumnNotExists.GenByArgs(oldName, tblInfo.Name)
}

// We need the latest column's offset and state. This information can be obtained from the store.
newCol.Offset = oldCol.Offset
newCol.State = oldCol.State
// Calculate column's new position.
oldPos, newPos := oldCol.Offset, oldCol.Offset
if pos.Tp == ast.ColumnPositionAfter {
Expand All @@ -477,10 +480,10 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo
}

columnChanged := make(map[string]*model.ColumnInfo)
columnChanged[oldName.L] = col
columnChanged[oldName.L] = newCol

if newPos == oldPos {
tblInfo.Columns[newPos] = col
tblInfo.Columns[newPos] = newCol
} else {
cols := tblInfo.Columns

Expand All @@ -490,7 +493,7 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo
} else {
copy(cols[oldPos:], cols[oldPos+1:newPos+1])
}
cols[newPos] = col
cols[newPos] = newCol

for i, col := range tblInfo.Columns {
if col.Offset != i {
Expand Down
2 changes: 0 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,6 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or

newCol := table.ToColumn(&model.ColumnInfo{
ID: col.ID,
Offset: col.Offset,
State: col.State,
OriginDefaultValue: col.OriginDefaultValue,
FieldType: *specNewColumn.Tp,
Name: specNewColumn.Name.Name,
Expand Down
72 changes: 72 additions & 0 deletions ddl/ddl_db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test
import (
"fmt"
"strings"
"sync"
"time"

"github.com/juju/errors"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -429,3 +431,73 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) {
callback = &ddl.TestDDLCallback{}
d.SetHook(callback)
}

func (s *testStateChangeSuite) TestParallelDDL(c *C) {
defer testleak.AfterTest(c)()
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int)")
c.Assert(err, IsNil)
defer s.se.Execute(context.Background(), "drop table t")

callback := &ddl.TestDDLCallback{}
times := 0
callback.OnJobUpdatedExported = func(job *model.Job) {
if times != 0 {
return
}
var qLen int64
var err error
for {
kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
qLen, err = m.DDLJobQueueLen()
if err != nil {
return err
}
return nil
})
if qLen == 2 {
break
}
time.Sleep(5 * time.Millisecond)
}
times++
}
d := s.dom.DDL()
d.SetHook(callback)

wg := sync.WaitGroup{}
var err1 error
var err2 error
se, err := session.CreateSession(s.store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
se1, err := session.CreateSession(s.store)
c.Assert(err, IsNil)
_, err = se1.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
go func() {
wg.Add(1)
defer wg.Done()
_, err1 = se.Execute(context.Background(), "ALTER TABLE t MODIFY COLUMN b int FIRST;")
}()

go func() {
wg.Add(1)
defer wg.Done()
_, err2 = se1.Execute(context.Background(), "ALTER TABLE t MODIFY COLUMN b int FIRST;")
}()

time.Sleep(1 * time.Second)
wg.Wait()
c.Assert(err1, IsNil)
c.Assert(err2, IsNil)

_, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)

callback = &ddl.TestDDLCallback{}
d.SetHook(callback)
}

0 comments on commit 28c21c9

Please sign in to comment.