diff --git a/ddl/column.go b/ddl/column.go index 0bebd5ac6ea60..0fc277e6ac78b 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -847,7 +847,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in if job.IsRollingback() { // For those column-type-change jobs which don't reorg the data. if !needChangeColumnData(oldCol, jobParam.newCol) { - return rollbackModifyColumnJob(t, tblInfo, job, oldCol, jobParam.modifyColumnTp) + return rollbackModifyColumnJob(t, tblInfo, job, jobParam.newCol, oldCol, jobParam.modifyColumnTp) } // For those column-type-change jobs which reorg the data. return rollbackModifyColumnJobWithData(t, tblInfo, job, oldCol, jobParam) @@ -1483,6 +1483,10 @@ func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.Ind func (w *worker) doModifyColumn( d *ddlCtx, t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo, newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) { + if oldCol.ID != newCol.ID { + job.State = model.JobStateRollingback + return ver, errKeyColumnDoesNotExits.GenWithStack("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", oldCol.Name, newCol.ID) + } // Column from null to not null. if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) { noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag) @@ -1804,9 +1808,9 @@ func checkAddColumnTooManyColumns(colNum int) error { } // rollbackModifyColumnJob rollbacks the job when an error occurs. -func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) { +func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, newCol, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) { var err error - if modifyColumnTp == mysql.TypeNull { + if oldCol.ID == newCol.ID && modifyColumnTp == mysql.TypeNull { // field NotNullFlag flag reset. tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.NotNullFlag // field PreventNullInsertFlag flag reset. diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 1873cb02d9fe0..6b6e9d8132b0a 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1062,6 +1062,54 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) { s.testControlParallelExecSQL(c, sql, sql, f) } +func (s *testStateChangeSuite) TestParallelAlterModifyColumnWithData(c *C) { + sql := "ALTER TABLE t MODIFY COLUMN c int;" + f := func(c *C, err1, err2 error) { + c.Assert(err1, IsNil) + c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel") + rs, err := s.se.Execute(context.Background(), "select * from t") + c.Assert(err, IsNil) + sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0]) + c.Assert(err, IsNil) + c.Assert(sRows[0][2], Equals, "3") + c.Assert(rs[0].Close(), IsNil) + _, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)") + c.Assert(err, IsNil) + rs, err = s.se.Execute(context.Background(), "select * from t") + c.Assert(err, IsNil) + sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0]) + c.Assert(err, IsNil) + c.Assert(sRows[1][2], Equals, "33") + c.Assert(rs[0].Close(), IsNil) + } + s.testControlParallelExecSQL(c, sql, sql, f) +} + +func (s *testStateChangeSuite) TestParallelAlterModifyColumnToNotNullWithData(c *C) { + sql := "ALTER TABLE t MODIFY COLUMN c int not null;" + f := func(c *C, err1, err2 error) { + c.Assert(err1, IsNil) + c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel") + rs, err := s.se.Execute(context.Background(), "select * from t") + c.Assert(err, IsNil) + sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0]) + c.Assert(err, IsNil) + c.Assert(sRows[0][2], Equals, "3") + c.Assert(rs[0].Close(), IsNil) + _, err = s.se.Execute(context.Background(), "insert into t values(11, 22, null, 44, 55)") + c.Assert(err, NotNil) + _, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)") + c.Assert(err, IsNil) + rs, err = s.se.Execute(context.Background(), "select * from t") + c.Assert(err, IsNil) + sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0]) + c.Assert(err, IsNil) + c.Assert(sRows[1][2], Equals, "33") + c.Assert(rs[0].Close(), IsNil) + } + s.testControlParallelExecSQL(c, sql, sql, f) +} + func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColumn(c *C) { sql1 := "ALTER TABLE t ADD COLUMN f INT GENERATED ALWAYS AS(a+1);" sql2 := "ALTER TABLE t MODIFY COLUMN a tinyint;" @@ -1337,12 +1385,15 @@ func (s *testStateChangeSuiteBase) prepareTestControlParallelExecSQL(c *C) (sess func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) { _, err := s.se.Execute(context.Background(), "use test_db_state") c.Assert(err, IsNil) - _, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))") + _, err = s.se.Execute(context.Background(), "create table t(a int, b int, c double default null, d int auto_increment,e int, index idx1(d), index idx2(d,e))") c.Assert(err, IsNil) if len(s.preSQL) != 0 { _, err := s.se.Execute(context.Background(), s.preSQL) c.Assert(err, IsNil) } + _, err = s.se.Execute(context.Background(), "insert into t values(1, 2, 3.1234, 4, 5)") + c.Assert(err, IsNil) + defer func() { _, err := s.se.Execute(context.Background(), "drop table t") c.Assert(err, IsNil)