Skip to content

Commit

Permalink
ddl: fix bug of updating tiflash replica status ddl job been stuck. (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Mar 5, 2020
1 parent 56ca69a commit 36788e0
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 26 deletions.
92 changes: 67 additions & 25 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) {
c.Assert(err, IsNil)

_, err = s.se.Execute(context.Background(), `create table tr(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down Expand Up @@ -829,25 +829,7 @@ func (s *testStateChangeSuite) TestParallelCreateAndRename(c *C) {

type checkRet func(c *C, err1, err2 error)

func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
c.Assert(err, IsNil)
if len(s.preSQL) != 0 {
_, err := s.se.Execute(context.Background(), s.preSQL)
c.Assert(err, IsNil)
}
defer s.se.Execute(context.Background(), "drop table t")

_, err = s.se.Execute(context.Background(), "drop database if exists t_part")
c.Assert(err, IsNil)
s.se.Execute(context.Background(), `create table t_part (a int key)
partition by range(a) (
partition p0 values less than (10),
partition p1 values less than (20)
);`)

func (s *testStateChangeSuite) prepareTestControlParallelExecSQL(c *C) (session.Session, session.Session, chan struct{}, ddl.Callback) {
callback := &ddl.TestDDLCallback{}
times := 0
callback.OnJobUpdatedExported = func(job *model.Job) {
Expand All @@ -873,12 +855,8 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
}
d := s.dom.DDL()
originalCallback := d.GetHook()
defer d.(ddl.DDLForTest).SetHook(originalCallback)
d.(ddl.DDLForTest).SetHook(callback)

wg := sync.WaitGroup{}
var err1 error
var err2 error
se, err := session.CreateSession(s.store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test_db_state")
Expand All @@ -887,7 +865,6 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
c.Assert(err, IsNil)
_, err = se1.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
wg.Add(2)
ch := make(chan struct{})
// Make sure the sql1 is put into the DDLJobQueue.
go func() {
Expand All @@ -909,6 +886,35 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
time.Sleep(5 * time.Millisecond)
}
}()
return se, se1, ch, originalCallback
}

func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
c.Assert(err, IsNil)
if len(s.preSQL) != 0 {
_, err := s.se.Execute(context.Background(), s.preSQL)
c.Assert(err, IsNil)
}
defer s.se.Execute(context.Background(), "drop table t")

_, err = s.se.Execute(context.Background(), "drop database if exists t_part")
c.Assert(err, IsNil)
s.se.Execute(context.Background(), `create table t_part (a int key)
partition by range(a) (
partition p0 values less than (10),
partition p1 values less than (20)
);`)

se, se1, ch, originalCallback := s.prepareTestControlParallelExecSQL(c)
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalCallback)

var err1 error
var err2 error
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
_, err1 = se.Execute(context.Background(), sql1)
Expand All @@ -923,6 +929,42 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
f(c, err1, err2)
}

func (s *testStateChangeSuite) TestParallelUpdateTableReplica(c *C) {
ctx := context.Background()
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(ctx, "drop table if exists t1;")
c.Assert(err, IsNil)
_, err = s.se.Execute(ctx, "create table t1 (a int);")
c.Assert(err, IsNil)
_, err = s.se.Execute(ctx, "alter table t1 set tiflash replica 3 location labels 'a','b';")
c.Assert(err, IsNil)

se, se1, ch, originalCallback := s.prepareTestControlParallelExecSQL(c)
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalCallback)

t1 := testGetTableByName(c, se, "test_db_state", "t1")

var err1 error
var err2 error
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
// Mock for table tiflash replica was available.
err1 = domain.GetDomain(se).DDL().UpdateTableReplicaInfo(se, t1.Meta().ID, true)
}()
go func() {
defer wg.Done()
<-ch
// Mock for table tiflash replica was available.
err2 = domain.GetDomain(se1).DDL().UpdateTableReplicaInfo(se1, t1.Meta().ID, true)
}()
wg.Wait()
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:-1]the replica available status of table t1 is already updated")
}

func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
se, err := session.CreateSession(s.store)
c.Assert(err, IsNil)
Expand Down
3 changes: 2 additions & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,8 @@ func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ erro
}
if tblInfo.TiFlashReplica == nil || (tblInfo.ID == physicalID && tblInfo.TiFlashReplica.Available == available) ||
(tblInfo.ID != physicalID && available == tblInfo.TiFlashReplica.IsPartitionAvailable(physicalID)) {
return ver, nil
job.State = model.JobStateCancelled
return ver, errors.Errorf("the replica available status of table %s is already updated", tblInfo.Name.String())
}

if tblInfo.ID == physicalID {
Expand Down

0 comments on commit 36788e0

Please sign in to comment.