diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 92f1bed1c07af..055e2c0828b62 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -664,7 +664,7 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) { c.Assert(err, IsNil) _, err = s.se.Execute(context.Background(), `create table tr( - id int, name varchar(50), + id int, name varchar(50), purchased date ) partition by range( year(purchased) ) ( @@ -829,25 +829,7 @@ func (s *testStateChangeSuite) TestParallelCreateAndRename(c *C) { type checkRet func(c *C, err1, err2 error) -func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) { - _, err := s.se.Execute(context.Background(), "use test_db_state") - c.Assert(err, IsNil) - _, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))") - c.Assert(err, IsNil) - if len(s.preSQL) != 0 { - _, err := s.se.Execute(context.Background(), s.preSQL) - c.Assert(err, IsNil) - } - defer s.se.Execute(context.Background(), "drop table t") - - _, err = s.se.Execute(context.Background(), "drop database if exists t_part") - c.Assert(err, IsNil) - s.se.Execute(context.Background(), `create table t_part (a int key) - partition by range(a) ( - partition p0 values less than (10), - partition p1 values less than (20) - );`) - +func (s *testStateChangeSuite) prepareTestControlParallelExecSQL(c *C) (session.Session, session.Session, chan struct{}, ddl.Callback) { callback := &ddl.TestDDLCallback{} times := 0 callback.OnJobUpdatedExported = func(job *model.Job) { @@ -873,12 +855,8 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin } d := s.dom.DDL() originalCallback := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalCallback) d.(ddl.DDLForTest).SetHook(callback) - wg := sync.WaitGroup{} - var err1 error - var err2 error se, err := session.CreateSession(s.store) c.Assert(err, IsNil) _, err = se.Execute(context.Background(), "use test_db_state") @@ -887,7 +865,6 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin c.Assert(err, IsNil) _, err = se1.Execute(context.Background(), "use test_db_state") c.Assert(err, IsNil) - wg.Add(2) ch := make(chan struct{}) // Make sure the sql1 is put into the DDLJobQueue. go func() { @@ -909,6 +886,35 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin time.Sleep(5 * time.Millisecond) } }() + return se, se1, ch, originalCallback +} + +func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) { + _, err := s.se.Execute(context.Background(), "use test_db_state") + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))") + c.Assert(err, IsNil) + if len(s.preSQL) != 0 { + _, err := s.se.Execute(context.Background(), s.preSQL) + c.Assert(err, IsNil) + } + defer s.se.Execute(context.Background(), "drop table t") + + _, err = s.se.Execute(context.Background(), "drop database if exists t_part") + c.Assert(err, IsNil) + s.se.Execute(context.Background(), `create table t_part (a int key) + partition by range(a) ( + partition p0 values less than (10), + partition p1 values less than (20) + );`) + + se, se1, ch, originalCallback := s.prepareTestControlParallelExecSQL(c) + defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalCallback) + + var err1 error + var err2 error + wg := sync.WaitGroup{} + wg.Add(2) go func() { defer wg.Done() _, err1 = se.Execute(context.Background(), sql1) @@ -923,6 +929,42 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin f(c, err1, err2) } +func (s *testStateChangeSuite) TestParallelUpdateTableReplica(c *C) { + ctx := context.Background() + _, err := s.se.Execute(context.Background(), "use test_db_state") + c.Assert(err, IsNil) + _, err = s.se.Execute(ctx, "drop table if exists t1;") + c.Assert(err, IsNil) + _, err = s.se.Execute(ctx, "create table t1 (a int);") + c.Assert(err, IsNil) + _, err = s.se.Execute(ctx, "alter table t1 set tiflash replica 3 location labels 'a','b';") + c.Assert(err, IsNil) + + se, se1, ch, originalCallback := s.prepareTestControlParallelExecSQL(c) + defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalCallback) + + t1 := testGetTableByName(c, se, "test_db_state", "t1") + + var err1 error + var err2 error + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + // Mock for table tiflash replica was available. + err1 = domain.GetDomain(se).DDL().UpdateTableReplicaInfo(se, t1.Meta().ID, true) + }() + go func() { + defer wg.Done() + <-ch + // Mock for table tiflash replica was available. + err2 = domain.GetDomain(se1).DDL().UpdateTableReplicaInfo(se1, t1.Meta().ID, true) + }() + wg.Wait() + c.Assert(err1, IsNil) + c.Assert(err2.Error(), Equals, "[ddl:-1]the replica available status of table t1 is already updated") +} + func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) { se, err := session.CreateSession(s.store) c.Assert(err, IsNil) diff --git a/ddl/table.go b/ddl/table.go index c46cfdcd8a8d8..3193345526485 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -726,7 +726,8 @@ func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ erro } if tblInfo.TiFlashReplica == nil || (tblInfo.ID == physicalID && tblInfo.TiFlashReplica.Available == available) || (tblInfo.ID != physicalID && available == tblInfo.TiFlashReplica.IsPartitionAvailable(physicalID)) { - return ver, nil + job.State = model.JobStateCancelled + return ver, errors.Errorf("the replica available status of table %s is already updated", tblInfo.Name.String()) } if tblInfo.ID == physicalID {