Skip to content

Commit

Permalink
ddl: fix concurrent column type changes(with changing data) that caus…
Browse files Browse the repository at this point in the history
…e schema and data inconsistencies (#31051)

close #31048
  • Loading branch information
zimulala authored Dec 28, 2021
1 parent 7688d37 commit 4ca3b02
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
10 changes: 7 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
if job.IsRollingback() {
// For those column-type-change jobs which don't reorg the data.
if !needChangeColumnData(oldCol, jobParam.newCol) {
return rollbackModifyColumnJob(t, tblInfo, job, oldCol, jobParam.modifyColumnTp)
return rollbackModifyColumnJob(t, tblInfo, job, jobParam.newCol, oldCol, jobParam.modifyColumnTp)
}
// For those column-type-change jobs which reorg the data.
return rollbackModifyColumnJobWithData(t, tblInfo, job, oldCol, jobParam)
Expand Down Expand Up @@ -1483,6 +1483,10 @@ func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.Ind
func (w *worker) doModifyColumn(
d *ddlCtx, t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) {
if oldCol.ID != newCol.ID {
job.State = model.JobStateRollingback
return ver, errKeyColumnDoesNotExits.GenWithStack("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", oldCol.Name, newCol.ID)
}
// Column from null to not null.
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag)
Expand Down Expand Up @@ -1804,9 +1808,9 @@ func checkAddColumnTooManyColumns(colNum int) error {
}

// rollbackModifyColumnJob rollbacks the job when an error occurs.
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, newCol, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
var err error
if modifyColumnTp == mysql.TypeNull {
if oldCol.ID == newCol.ID && modifyColumnTp == mysql.TypeNull {
// field NotNullFlag flag reset.
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.NotNullFlag
// field PreventNullInsertFlag flag reset.
Expand Down
53 changes: 52 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,54 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAlterModifyColumnWithData(c *C) {
sql := "ALTER TABLE t MODIFY COLUMN c int;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[0][2], Equals, "3")
c.Assert(rs[0].Close(), IsNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)")
c.Assert(err, IsNil)
rs, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[1][2], Equals, "33")
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAlterModifyColumnToNotNullWithData(c *C) {
sql := "ALTER TABLE t MODIFY COLUMN c int not null;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[0][2], Equals, "3")
c.Assert(rs[0].Close(), IsNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, null, 44, 55)")
c.Assert(err, NotNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)")
c.Assert(err, IsNil)
rs, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[1][2], Equals, "33")
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColumn(c *C) {
sql1 := "ALTER TABLE t ADD COLUMN f INT GENERATED ALWAYS AS(a+1);"
sql2 := "ALTER TABLE t MODIFY COLUMN a tinyint;"
Expand Down Expand Up @@ -1337,12 +1385,15 @@ func (s *testStateChangeSuiteBase) prepareTestControlParallelExecSQL(c *C) (sess
func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c double default null, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
c.Assert(err, IsNil)
if len(s.preSQL) != 0 {
_, err := s.se.Execute(context.Background(), s.preSQL)
c.Assert(err, IsNil)
}
_, err = s.se.Execute(context.Background(), "insert into t values(1, 2, 3.1234, 4, 5)")
c.Assert(err, IsNil)

defer func() {
_, err := s.se.Execute(context.Background(), "drop table t")
c.Assert(err, IsNil)
Expand Down

0 comments on commit 4ca3b02

Please sign in to comment.